Indexer's Content Manager Rest API implementation (#662)

* First version of Indexer content manager API

* Include TODOs and make responses clearer

* Add Changelog entry and improve error logic

* Refactor clases and add models

* Add tests and small improvements

* Add basicAuth to OpenAPI file

* Add documentation

* Apply changes from code review

* Change models from singleton to normal clases

* Change models to use Jackson, Change Credentials class for Token class

* Add documentation about the new roles

* Apply changes from code review

* Refactoring

* Refactoring

* Update documentation to the latests refactors

* Refactor PlanService to use the Token model

* Refactor AuthService to use the Token model

* Refactor AuthService to use the Subscription model

* Reorganize code

* Refactor RestPostSubscriptionAction

* Small fixes on RestPostSubscription

* Refactor RestGetSubscription

* Remove unnused imports

* Refactor RestDeleteSubscription

* Start the refactor of RestPostUpdate

* Refactor RestPostUpdate

* Add Tests

* Add javadocs and improve documentation

* Apply changes from code review

* Add Content Manager docs

* Update documentation

* Fix Documentation

---------

Signed-off-by: Jorge Sánchez <jorge.sanchez@wazuh.com>
Co-authored-by: Alex Ruiz <alejandro.ruiz.becerra@wazuh.com>
This commit is contained in:
Jorge Sánchez 2025-11-26 15:00:15 +01:00 committed by GitHub
parent ffab688f14
commit 0ee82a3cca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 1699 additions and 215 deletions

View File

@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement pre-processing ECS sources types sanitization [(#628)](https://github.com/wazuh/wazuh-indexer-plugins/pull/628)
- Add Security Compliance fields to the WCS [(#643)](https://github.com/wazuh/wazuh-indexer-plugins/pull/643)
- Initialize indexer content manager [(#651)](https://github.com/wazuh/wazuh-indexer-plugins/pull/651)
- Indexer's Content Manager Rest API implementation [(#662)](https://github.com/wazuh/wazuh-indexer-plugins/pull/662)
- Implement Imposter mock server for CTI API [#661](https://github.com/wazuh/wazuh-indexer-plugins/pull/661)
- Implement authentication in CTI Console [#666](https://github.com/wazuh/wazuh-indexer-plugins/pull/666)
- Initialize consumers metadata index on start [(#668)](https://github.com/wazuh/wazuh-indexer-plugins/pull/668)

View File

@ -15,6 +15,7 @@
- [Setup](dev/plugins/setup.md)
- [Security](dev/plugins/security.md)
- [Reporting](dev/plugins/reporting.md)
- [Content Manager](dev/plugins/content-manager.md)
# Reference Manual
- [Introduction]()
@ -34,6 +35,9 @@
- [Architecture](ref/modules/setup/architecture.md)
- [API Reference]()
- [Wazuh Common Schema](ref/modules/setup/schema.md)
- [Content Manager](ref/modules/content-manager/index.md)
- [Architecture](ref/modules/content-manager/architecture.md)
- [API Reference](ref/modules/content-manager/api.md)
- [Reporting](ref/modules/reporting/index.md)
- [Usage](ref/modules/reporting/usage.md)
- [Upgrade](ref/upgrade.md)

View File

@ -14,7 +14,7 @@ The Content Manager plugin handles:
The plugin manages two main indices:
- `wazuh-ruleset`: Contains the actual security content (rules, decoders, etc.)
- `wazuh-context`: Stores consumer information and synchronization state
- `.cti-consumers`: Stores consumer information and synchronization state
---
@ -25,7 +25,7 @@ The plugin manages two main indices:
#### 1. **ContentManagerPlugin**
Main class located at: `/plugins/content-manager/src/main/java/com/wazuh/contentmanager/ContentManagerPlugin.java`
This is the entry point of the plugin. It initializes when the `wazuh-ruleset` index is created by the setup plugin.
This is the entry point of the plugin.
#### 2. **ContentIndex**
Located at: `/plugins/content-manager/src/main/java/com/wazuh/contentmanager/index/ContentIndex.java`
@ -35,13 +35,14 @@ Manages operations on the `wazuh-ruleset` index:
- Document patching (add, update, delete)
- Query and retrieval operations
#### 3. **ContextIndex**
Located at: `/plugins/content-manager/src/main/java/com/wazuh/contentmanager/index/ContextIndex.java`
#### 3. **ConsumersIndex**
Located at: `/plugins/content-manager/src/main/java/com/wazuh/contentmanager/index/ConsumersIndex.java`
Manages the `wazuh-context` index which stores:
- Consumer ID and context information
- Current offset (last successfully applied change)
- Last available offset from the CTI API
Manages the `.cti-consumers` index which stores:
- Consumer name
- Local offset (last successfully applied change)
- Remote offset (last available offset from the CTI API)
- Snapshot link from where the index was initialized
#### 4. **ContentUpdater**
Located at: `/plugins/content-manager/src/main/java/com/wazuh/contentmanager/updater/ContentUpdater.java`
@ -66,40 +67,6 @@ Handles initial content bootstrapping:
The plugin is configured through the `PluginSettings` class. Settings can be defined in `opensearch.yml`:
### Available Settings
| Setting | Default | Description |
|---------|---------|-------------|
| `content_manager.cti.api` | `https://cti.wazuh.com/api/v1` | CTI API base URL |
| `content_manager.cti.consumer` | `vd_4.8.0` | Consumer ID for tracking |
| `content_manager.cti.context` | `vd_1.0.0` | Context ID for versioning |
| `content_manager.cti.client.max_attempts` | `3` | Maximum retry attempts (2-5) |
| `content_manager.cti.client.sleep_time` | `60` | Initial retry delay in seconds (20-100) |
| `content_manager.client_timeout` | `10` | Client timeout in seconds |
| `content_manager.max_changes` | `1000` | Maximum changes per update batch |
| `content_manager.max_items_per_bulk` | `25` | Items per bulk request |
| `content_manager.max_concurrent_bulks` | `5` | Concurrent bulk operations |
| `content_manager.job.max_docs` | `1000` | Maximum documents per job |
| `content_manager.job.schedule` | `1` | Job schedule interval |
### Example Configuration
```yaml
# opensearch.yml
content_manager:
cti:
api: "https://cti.wazuh.com/api/v1"
consumer: "vd_4.8.0"
context: "vd_1.0.0"
client:
max_attempts: 3
sleep_time: 60
max_changes: 1000
max_items_per_bulk: 25
client_timeout: 10
```
---
## 🔄 How Content Synchronization Works
@ -107,27 +74,26 @@ content_manager:
When the plugin starts on a cluster manager node:
1. Waits for the `wazuh-ruleset` index to be created by the setup plugin
2. Creates the `wazuh-context` index if it doesn't exist
3. Checks the consumer's offset:
- **If offset = 0**: Downloads and indexes a snapshot
- **If offset > 0**: Proceeds with incremental updates
1. Creates the `.cti-consumers` index if it doesn't exist
2. Checks the consumer's local_offset:
- **If local_offset = 0**: Downloads and indexes a snapshot
- **If local_offset > 0**: Proceeds with incremental updates
### 2. **Update Phase**
The update process follows these steps:
1. Fetches current consumer information from `wazuh-context`
2. Compares `offset` with `lastOffset` from CTI API
1. Fetches current consumer information from `.cti-consumers`
2. Compares `local_offset` with `remote_offset` from CTI API
3. If different, fetches changes in batches (max `content_manager.max_changes`)
4. Applies changes using JSON Patch operations (add, update, delete)
5. Updates the offset after successful application
6. Repeats until `offset == lastOffset`
5. Updates the local_offset after successful application
6. Repeats until `local_offset == remote_offset`
### 3. **Error Handling**
- **Recoverable errors**: Updates offset and retries later
- **Critical failures**: Resets offset to 0, triggering snapshot re-initialization
- **Recoverable errors**: Updates local_offset and retries later
- **Critical failures**: Resets local_offset to 0, triggering snapshot re-initialization
---
@ -136,7 +102,7 @@ The update process follows these steps:
### Check Consumer Status
```bash
GET /wazuh-context/_search
GET /.cti-consumers/_search
{
"query": {
"match_all": {}
@ -166,11 +132,9 @@ tail -f logs/opensearch.log | grep -E "ContentManager|ContentUpdater|SnapshotMan
## 📌 Important Notes
- The plugin only runs on **cluster manager nodes**
- Requires the **setup plugin** to create the `wazuh-ruleset` index first
- CTI API must be accessible for content synchronization
- Offset-based synchronization ensures no content is missed
- Snapshot initialization provides a fast bootstrap mechanism
- All operations are performed with appropriate privileges using the `Privileged` wrapper
---

View File

@ -14,7 +14,6 @@ To verify everything is working correctly, try generating reports following the
- Wazuh Dashboard package (debian package based on OpenSearch 3.1.0). Downloaded from [wazuh-dashboard actions](https://github.com/wazuh/wazuh-dashboard/actions/runs/16009728935).
> Note: To test using RPM packages, update the Vagrant configuration and provisioning scripts accordingly (for example, change `generic/ubuntu2204` to `generic/centos7` in the Vagrantfile and replace Debian-specific installation commands with RPM equivalents).
### Preparing a development environment
Prepare a multi-VM Vagrant environment with the following components:
@ -310,4 +309,4 @@ gencert_ec
echo "admin" | /usr/share/wazuh-indexer/bin/opensearch-keystore add opensearch.notifications.core.email.mailpit.password
chown wazuh-indexer:wazuh-indexer /etc/wazuh-indexer/opensearch.keystore
```
3. Ensure `mailpit` is accessible within the `server` VM (e.g `curl https://172.28.128.136:8025 -k -u admin:admin` should return HTML code). If not, add it to the list of known hosts in `/etc/hosts` (e.g `echo "172.28.128.136 mailpit mailpit" >> /etc/hosts`).
3. Ensure `mailpit` is accessible within the `server` VM (e.g `curl https://172.28.128.136:8025 -k -u admin:admin` should return HTML code). If not, add it to the list of known hosts in `/etc/hosts` (e.g `echo "172.28.128.136 mailpit mailpit" >> /etc/hosts`).

View File

@ -23,12 +23,9 @@ new-user:
backend_roles: []
description: "New user description"
```
OpenSearch's reference:
- [internal_users.yml](https://docs.opensearch.org/docs/latest/security/configuration/yaml/#internal_usersyml)
## 2. Adding a new role
Add the new role to the `roles.wazuh.yml` file located at: `wazuh-indexer/distribution/src/config/security/`.
- Under `index_permissions.index_patterns`, list the index patterns the role will have effect on.
- Under `index_permissions.allowed_actions`, list the allowed action groups or indiviual permissions granted to this role.
@ -50,7 +47,6 @@ role-read:
- "read"
tenant_permissions: []
static: true
role-write:
reserved: true
hidden: false
@ -86,7 +82,6 @@ role-read:
users:
- "new-user"
and_backend_roles: [ ]
role-write:
reserved: true
hidden: false
@ -128,4 +123,4 @@ The `indexer-security-init.sh` will overwrite your security configuration, inclu
Alternatively, apply the new configuration using fine-grained options. See [Applying changes to configuration files](https://docs.opensearch.org/docs/latest/security/configuration/security-admin/)
</div>
</div>

View File

@ -66,15 +66,12 @@ public class SetupPlugin extends Plugin implements ClusterPlugin {
> curl -X GET <indexer-IP>:9200/_cat/indices?v
> ```
Alternatively, use the Developer Tools console from the Wazuh Dashboard, or your browser.
## 🔁 Creating a New ISM (Index State Management) Policy
### 1. Add Rollover Alias to the Index Template
Edit the existing index template JSON file and add the following setting:
```json
"plugins.index_state_management.rollover_alias": "<index-name>"
```
### 2. Define the ISM Policy
Refer to the [OpenSearch ISM Policies documentation](https://docs.opensearch.org/docs/latest/im-plugin/ism/policies/) for more details.
@ -148,4 +145,4 @@ Always follow existing naming conventions to maintain consistency.
Use epoch timestamps (in milliseconds) for `last_updated_time` fields.
ISM policies and templates must be properly deployed before the indices are created.
ISM policies and templates must be properly deployed before the indices are created.

View File

@ -1 +0,0 @@
# API reference

View File

@ -0,0 +1,3 @@
# API Reference
The Content Manager plugin exposes an HTTP REST API. For detailed information about the API endpoints, request/response formats, and other specifications, please refer to its OpenAPI document [openapi.yml](https://raw.githubusercontent.com/wazuh/wazuh-indexer-plugins/refs/heads/main/plugins/content-manager/openapi.yml).

View File

@ -37,19 +37,19 @@ sequenceDiagram
end
```
## Schema of the `wazuh-content` index
## Schema of the `.cti-consumers` index
[ONLINE]
```json
[
{
"_index": "wazuh-content",
"_id": "vd_1.0.0",
"_index": ".cti-consumers",
"_id": "consumer-name",
"_source": {
"vd_4.8.0": {
"offset": 75019,
"last_offset": 85729
}
"name": "consumer-name",
"local_offset": 75019,
"remote_offset": 85729,
"snapshot_link": "uri-to-snapshot"
}
},
]
@ -58,14 +58,13 @@ sequenceDiagram
```json
[
{
"_index": "wazuh-content",
"_id": "vd_1.0.0",
"_index": ".cti-consumers",
"_id": "consumer-name",
"_source": {
"vd_4.8.0": {
"offset": 0,
"snapshot": "uri-to-snapshot"
}
"name": "consumer-name",
"local_offset": 0,
"snapshot_link": "uri-to-snapshot"
}
}
},
]
```

View File

@ -13,20 +13,23 @@ These default users and roles definitions are stored in the `internal_users.yml`
### Users
| User | Description | Roles |
| ----------------- | ---------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------- |
| `wazuh-server` | User for the Wazuh Server with read/write access to stateful indices and write-only access to stateless indices. | `stateless-write`, `stateful-delete`, `stateful-write`, `stateful-read` |
| `wazuh-dashboard` | User for Wazuh Dashboard with read access to stateful and stateless indices, and management level permissionsfor the monitoring indices. | `sample-data-management`, `metrics-write`, `metrics-read`, `stateless-read`, `stateful-read` |
| User | Description | Roles |
|-------------------|------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
| `wazuh-server` | User for the Wazuh Server with read/write access to stateful indices and write-only access to stateless indices. | `stateless-write`, `stateful-delete`, `stateful-write`, `stateful-read`, `cm_subscription_read` |
| `wazuh-dashboard` | User for Wazuh Dashboard with read access to stateful and stateless indices, and management level permissionsfor the monitoring indices. | `sample-data-management`, `metrics-write`, `metrics-read`, `stateless-read`, `stateful-read`, `cm_update`, `cm_subscription_write` |
### Roles
| Role Name | Access Description | Index Patterns | Permissions |
| ------------------------ | --------------------------------------------------- | ---------------------------------------- | ----------------------- |
| `stateful-read` | Grants read-only permissions to stateful indices. | `wazuh-states-*` | `read` |
| `stateful-write` | Grants write-only permissions to stateful indices. | `wazuh-states-*` | `index` |
| `stateful-delete` | Grants delete permissions to stateful indices. | `wazuh-states-*` | `delete` |
| `stateless-read` | Grants read-only permissions to stateless indices. | `wazuh-alerts*`, `wazuh-archives*` | `read` |
| `stateless-write` | Grants write-only permissions to stateless indices. | `wazuh-alerts*`, `wazuh-archives*` | `index` |
| `metrics-read` | Grants read permissions to metrics indices. | `wazuh-monitoring*`, `wazuh-statistics*` | `read` |
| `metrics-write` | Grants write permissions to metrics indices. | `wazuh-monitoring*`, `wazuh-statistics*` | `index` |
| `sample-data-management` | Grants full permissions to sample data indices. | `*-sample-*` | `data_access`, `manage` |
| Role Name | Access Description | Index Patterns | Permissions |
|--------------------------|--------------------------------------------------------------------------------|------------------------------------------|------------------------------------------------------------------------------------------|
| `stateful-read` | Grants read-only permissions to stateful indices. | `wazuh-states-*` | `read` |
| `stateful-write` | Grants write-only permissions to stateful indices. | `wazuh-states-*` | `index` |
| `stateful-delete` | Grants delete permissions to stateful indices. | `wazuh-states-*` | `delete` |
| `stateless-read` | Grants read-only permissions to stateless indices. | `wazuh-alerts*`, `wazuh-archives*` | `read` |
| `stateless-write` | Grants write-only permissions to stateless indices. | `wazuh-alerts*`, `wazuh-archives*` | `index` |
| `metrics-read` | Grants read permissions to metrics indices. | `wazuh-monitoring*`, `wazuh-statistics*` | `read` |
| `metrics-write` | Grants write permissions to metrics indices. | `wazuh-monitoring*`, `wazuh-statistics*` | `index` |
| `sample-data-management` | Grants full permissions to sample data indices. | `*-sample-*` | `data_access`, `manage` |
| `cm_subscription_read` | Grants permissions to retrieve subscriptions for the server. | N/A | `plugin:content_manager/subscription_get` |
| `cm_subscription_write` | Grants permissions to create and delete subscriptions for the content manager. | N/A | `plugin:content_manager/subscription_post`, `plugin:content_manager/subscription_delete` |
| `cm_update` | Grants permissions to perform update operations in the content manager. | N/A | `plugin:content_manager/update` |

View File

@ -127,6 +127,12 @@ dependencies {
implementation 'com.google.code.gson:gson:2.8.9'
}
// Ensure the yamlRestTest source set can access plugin main and test classes
dependencies {
yamlRestTestImplementation sourceSets.main.output
yamlRestTestImplementation sourceSets.test.output
}
task integTest(type: RestIntegTestTask) {
description = "Run tests against a cluster"
testClassesDirs = sourceSets.test.output.classesDirs

View File

@ -1,25 +1,324 @@
openapi: "3.0.3"
openapi: 3.0.0
info:
title: Wazuh Indexer Content Manager API
version: "1.0"
title: Indexer Content Manager API
description: |
API for content management and subscription handling for the Wazuh Indexer Content Manager.
This API enables Cyber Threat Intelligence (CTI) subscription registration, credential management,
and on-demand content updates for threat intelligence feeds.
version: 1.0.0
license:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0.html
servers:
- url: http://127.0.0.1:9200/_plugins/_content_manager
- url: "{protocol}://{wazuh.indexer}:{port}/_plugins/content-manager"
description: Wazuh Indexer Content Manager Plugin
variables:
protocol:
enum:
- "http"
- "https"
default: "https"
wazuh.indexer:
default: localhost
port:
default: "9200"
tags:
- name: Subscription Management
description: Operations for managing CTI subscriptions and credentials
- name: Content Updates
description: Operations for triggering content updates
security:
- bearerAuth: []
- basicAuth: []
paths:
/updater:
get:
summary: Launch the Content Updater module
/subscription:
description: CTI registration & CTI credentials retrieval
post:
summary: Register CTI Subscription
description: |
Register a new Cyber Threat Intelligence subscription using device code authentication flow.
This endpoint initiates the subscription process and must be called with valid device code
credentials obtained from the CTI provider. The registration process is asynchronous and
may take several seconds to complete.
operationId: registerSubscription
tags:
- "update"
operationId: updater
parameters:
- name: "from_offset"
in: query
description: The offset from which the updater will start
required: false
- Subscription Management
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/SubscriptionPostRequest'
examples:
registration:
summary: Example subscription registration
value:
device_code: "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS"
client_id: "a17c21ed"
expires_in: 1800
interval: 5
responses:
'201':
$ref: '#/components/responses/SubscriptionCreated'
'400':
$ref: '#/components/responses/BadRequest'
'401':
$ref: '#/components/responses/Unauthorized'
'500':
$ref: '#/components/responses/InternalServerError'
get:
summary: Retrieve CTI Credentials
description: |
Retrieve the access token and credentials for an active CTI subscription.
The access token should be used as a Bearer token for subsequent API calls to CTI services.
operationId: getCredentials
tags:
- Subscription Management
responses:
'200':
$ref: '#/components/responses/GetCredentialsSuccess'
'401':
$ref: '#/components/responses/Unauthorized'
'404':
$ref: '#/components/responses/NotFound'
'500':
$ref: '#/components/responses/InternalServerError'
delete:
summary: Delete CTI Subscription
description: |
Delete an existing CTI subscription and revoke all associated credentials.
This operation is irreversible and will immediately terminate access to threat intelligence feeds.
operationId: deleteSubscription
tags:
- Subscription Management
responses:
'200':
$ref: '#/components/responses/SubscriptionDeleted'
'401':
$ref: '#/components/responses/Unauthorized'
'404':
$ref: '#/components/responses/NotFound'
'500':
$ref: '#/components/responses/InternalServerError'
/update:
description: On demand content update
post:
summary: Initiate On-Demand Content Update
description: |
Trigger an immediate update of threat intelligence content from subscribed CTI feeds.
This operation is asynchronous and returns immediately with a task identifier.
The update process runs in the background and may take several minutes to complete
depending on the volume of new threat intelligence data.
**Rate Limiting**: This endpoint is rate-limited to prevent excessive API calls.
The default limit is 10 requests per hour.
operationId: updateContent
tags:
- Content Updates
responses:
'202':
$ref: '#/components/responses/UpdateAccepted'
'401':
$ref: '#/components/responses/Unauthorized'
'404':
$ref: '#/components/responses/NotFound'
'409':
$ref: '#/components/responses/Conflict'
'429':
$ref: '#/components/responses/TooManyRequests'
'500':
$ref: '#/components/responses/InternalServerError'
components:
# --- Data Schemas ---
schemas:
SubscriptionPostRequest:
type: object
description: Input parameters for registering a CTI subscription.
properties:
device_code:
type: string
description: The device code for registration.
example: "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS"
client_id:
type: string
description: The client ID.
example: "a17c21ed"
expires_in:
type: integer
description: Expiration time in seconds.
example: 1800
interval:
type: integer
description: Polling interval in seconds.
example: 5
required:
- device_code
- client_id
- expires_in
- interval
SubscriptionGetResponse:
type: object
description: Response schema for retrieving CTI credentials.
properties:
access_token:
type: string
description: Access token for authentication.
example: "AYjcyMzY3ZDhiNmJkNTY"
token_type:
type: string
description: Type of the token.
example: "Bearer"
required:
- access_token
- token_type
RestResponse:
type: object
description: Standard error response schema.
properties:
error:
type: string
required:
- error
# --- Responses schemas ---
responses:
# --- Success Responses ---
SubscriptionCreated:
description: Created - Subscription registered successfully.
GetCredentialsSuccess:
description: OK - Credentials retrieved successfully.
content:
application/json:
schema:
$ref: '#/components/schemas/SubscriptionGetResponse'
examples:
credentials:
summary: Valid credentials response
value:
access_token: "AYjcyMzY3ZDhiNmJkNTY"
token_type: "Bearer"
SubscriptionDeleted:
description: OK - Subscription deleted successfully.
UpdateAccepted:
description: Accepted - The update request has been accepted for processing.
headers:
X-RateLimit-Limit:
description: The maximum number of requests allowed per hour.
schema:
type: integer
responses:
"200":
description: OK
"500":
description: Internal server error (boom!)
example: 10
X-RateLimit-Remaining:
description: The number of requests remaining in the current window.
schema:
type: integer
example: 7
X-RateLimit-Reset:
description: The time at which the current rate limit window resets (Unix timestamp).
schema:
type: integer
example: 1699963200
# --- Error Responses ---
BadRequest:
description: Bad Request - Invalid input parameters.
content:
application/json:
schema:
$ref: '#/components/schemas/RestResponse'
examples:
missing_parameter:
summary: Missing required parameter
value:
error: "Required parameter 'client_id' is missing."
Unauthorized:
description: Unauthorized - Authentication failed or credentials are invalid.
NotFound:
description: Not Found - The requested resource does not exist.
Conflict:
description: Conflict - Undergoing content update.
content:
application/json:
schema:
$ref: '#/components/schemas/RestResponse'
examples:
undergoing_update:
summary: Content update in progress
value:
error: "A content update is already in progress."
TooManyRequests:
description: Too Many Requests - Rate limit exceeded.
headers:
X-RateLimit-Limit:
description: The maximum number of requests allowed per hour.
schema:
type: integer
example: 10
X-RateLimit-Remaining:
description: The number of requests remaining (will be 0).
schema:
type: integer
example: 0
X-RateLimit-Reset:
description: The time at which the current rate limit window resets (Unix timestamp).
schema:
type: integer
example: 1699963200
Retry-After:
description: Number of seconds to wait before retrying.
schema:
type: integer
example: 3600
content:
application/json:
schema:
$ref: '#/components/schemas/RestResponse'
examples:
rate_limit_exceeded:
summary: Rate limit exceeded
value:
error: "Too many update requests. Please try again later."
InternalServerError:
description: Internal Server Error - Server encountered an unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/RestResponse'
examples:
internal_error:
summary: Internal server error
value:
error: "An unexpected error occurred while processing your request."
securitySchemes:
bearerAuth:
type: http
scheme: bearer
bearerFormat: JWT
description: |
Bearer token authentication using JWT (JSON Web Token).
Include the token in the Authorization header as: `Authorization: Bearer <token>`
Tokens can be obtained through the Wazuh authentication system.
basicAuth:
type: http
scheme: basic
description: |
**Secure Basic Authentication**.
Clients must provide username and password encoded in Base64.

View File

@ -17,46 +17,49 @@
package com.wazuh.contentmanager;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import com.wazuh.contentmanager.utils.SnapshotManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.transport.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.*;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.opensearch.watcher.ResourceWatcherService;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
/**
* Main class of the Content Manager Plugin
*/
public class ContentManagerPlugin extends Plugin implements ClusterPlugin {
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.rest.services.RestDeleteSubscriptionAction;
import com.wazuh.contentmanager.rest.services.RestGetSubscriptionAction;
import com.wazuh.contentmanager.rest.services.RestPostSubscriptionAction;
import com.wazuh.contentmanager.rest.services.RestPostUpdateAction;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import com.wazuh.contentmanager.utils.SnapshotManager;
import org.opensearch.rest.RestHandler;
import java.util.List;
import java.util.Collections;
/** Main class of the Content Manager Plugin */
public class ContentManagerPlugin extends Plugin implements ClusterPlugin, ActionPlugin {
private static final Logger log = LogManager.getLogger(ContentManagerPlugin.class);
/**
* Semaphore to ensure the context index creation is only triggered once.
*/
private static final Semaphore indexCreationSemaphore = new Semaphore(1);
private ConsumersIndex consumersIndex;
private ContentIndex contentIndex;
private SnapshotManager snapshotManager;
@ -64,6 +67,11 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin {
private ClusterService clusterService;
private CtiConsole ctiConsole;
// Rest API endpoints
public static final String PLUGINS_BASE_URI = "/_plugins/content-manager";
public static final String SUBSCRIPTION_URI = PLUGINS_BASE_URI + "/subscription";
public static final String UPDATE_URI = PLUGINS_BASE_URI + "/update";
@Override
public Collection<Object> createComponents(
Client client,
@ -83,8 +91,10 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin {
this.consumersIndex = new ConsumersIndex(client);
this.contentIndex = new ContentIndex(client);
this.snapshotManager =
new SnapshotManager(environment, this.consumersIndex, this.contentIndex, new Privileged());
// this.ctiConsole = new CtiConsole(new AuthServiceImpl());
new SnapshotManager(environment, this.consumersIndex, this.contentIndex, new Privileged());
// Content Manager 5.0
this.ctiConsole = new CtiConsole();
return Collections.emptyList();
}
@ -99,10 +109,8 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin {
public void onNodeStarted(DiscoveryNode localNode) {
// Only cluster managers are responsible for the initialization.
if (localNode.isClusterManagerNode()) {
log.info("Starting Content Manager plugin initialization");
this.start();
}
/*
// Use case 1. Polling
AuthServiceImpl authService = new AuthServiceImpl();
@ -138,6 +146,22 @@ public class ContentManagerPlugin extends Plugin implements ClusterPlugin {
*/
}
public List<RestHandler> getRestHandlers(
Settings settings,
org.opensearch.rest.RestController restController,
org.opensearch.common.settings.ClusterSettings clusterSettings,
org.opensearch.common.settings.IndexScopedSettings indexScopedSettings,
org.opensearch.common.settings.SettingsFilter settingsFilter,
org.opensearch.cluster.metadata.IndexNameExpressionResolver indexNameExpressionResolver,
java.util.function.Supplier<org.opensearch.cluster.node.DiscoveryNodes> nodesInCluster) {
return List.of(
new RestGetSubscriptionAction(this.ctiConsole),
new RestPostSubscriptionAction(this.ctiConsole),
new RestDeleteSubscriptionAction(this.ctiConsole),
new RestPostUpdateAction(this.ctiConsole)
);
}
/**
* Initialize. The initialization consists of:
*

View File

@ -27,10 +27,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.env.Environment;
import org.opensearch.action.ActionType;
import org.opensearch.action.ActionRequest;
import org.opensearch.core.action.ActionResponse;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
@ -41,8 +37,8 @@ import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.*;
import com.wazuh.contentmanager.model.cti.Changes;
import com.wazuh.contentmanager.model.cti.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.VisibleForTesting;
import com.wazuh.contentmanager.utils.XContentUtils;

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.model.cti;
package com.wazuh.contentmanager.cti.catalog.model;
import org.opensearch.core.common.ParsingException;
import org.opensearch.core.xcontent.ToXContentObject;

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.model.cti;
package com.wazuh.contentmanager.cti.catalog.model;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.model.cti;
package com.wazuh.contentmanager.cti.catalog.model;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

View File

@ -14,7 +14,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.wazuh.contentmanager.model.cti;
package com.wazuh.contentmanager.cti.catalog.model;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

View File

@ -3,6 +3,7 @@ package com.wazuh.contentmanager.cti.console;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.cti.console.service.AuthService;
import com.wazuh.contentmanager.cti.console.service.PlansService;
import com.wazuh.contentmanager.cti.console.model.Subscription;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.util.concurrent.FutureUtils;
@ -89,10 +90,10 @@ public class CtiConsole implements TokenListener {
}
@Override
public void onTokenChanged(Token t) {
public void onTokenChanged(Token token) {
tokenLock.lock();
try {
this.token = t;
this.token = token;
log.info("Permanent token changed: {}", this.token); // TODO do not log the token
// Cancel polling
@ -108,19 +109,19 @@ public class CtiConsole implements TokenListener {
/**
* Starts a periodic task to obtain a permanent token from the CTI Console.
* @param interval the period between successive executions.
* @param subscription subscription details, including the period between successive executions.
*/
private void getToken(int interval/* TODO sub details */) {
Runnable getTokenTask = () -> this.authService.getToken("client_id", "polling");
this.getTokenTaskFuture = this.executor.scheduleAtFixedRate(getTokenTask, interval, interval, TimeUnit.SECONDS);
private void getToken(Subscription subscription) {
Runnable getTokenTask = () -> this.authService.getToken(subscription);
this.getTokenTaskFuture = this.executor.scheduleAtFixedRate(getTokenTask, subscription.getInterval(), subscription.getInterval(), TimeUnit.SECONDS);
}
/**
* Triggers the mechanism to obtain a permanent token from the CTI Console.
* This method is meant to be called by the Rest handler.
*/
public void onPostSubscriptionRequest (/* TODO sub details */) {
this.getToken(5);
public void onPostSubscriptionRequest (Subscription subscription) {
this.getToken(subscription);
}
/**
@ -184,4 +185,12 @@ public class CtiConsole implements TokenListener {
tokenLock.unlock();
}
}
/**
* Deletes the permanent token stored in this CTI Console instance.
*/
public void deleteToken() {
this.token = null;
}
}

View File

@ -6,5 +6,5 @@ import java.util.EventListener;
public interface TokenListener extends EventListener {
void onTokenChanged(Token t);
void onTokenChanged(Token token);
}

View File

@ -1,5 +1,6 @@
package com.wazuh.contentmanager.cti.console.client;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.utils.http.HttpResponseCallback;
import org.apache.hc.client5.http.async.methods.*;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
@ -120,14 +121,14 @@ public class ApiClient {
* @throws InterruptedException request failed / interrupted.
* @throws TimeoutException request timed out.
*/
public SimpleHttpResponse getResourceToken(String permanentToken, String resource) throws ExecutionException, InterruptedException, TimeoutException {
public SimpleHttpResponse getResourceToken(Token permanentToken, String resource) throws ExecutionException, InterruptedException, TimeoutException {
String formBody = String.join("&", List.of(
"grant_type=urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token_type=urn:ietf:params:oauth:token-type:access_token",
"requested_token_type=urn:wazuh:params:oauth:token-type:signed_url",
"resource=" + resource
));
String token = String.format(Locale.ROOT, "Bearer %s", permanentToken);
String token = String.format(Locale.ROOT, "%s %s", permanentToken.getTokenType(), permanentToken.getAccessToken());
SimpleHttpRequest request = SimpleRequestBuilder
.post(RESOURCE_URI)
@ -153,8 +154,8 @@ public class ApiClient {
* @throws InterruptedException request failed / interrupted.
* @throws TimeoutException request timed out.
*/
public SimpleHttpResponse getPlans(String permanentToken) throws ExecutionException, InterruptedException, TimeoutException {
String token = String.format(Locale.ROOT, "Bearer %s", permanentToken);
public SimpleHttpResponse getPlans(Token permanentToken) throws ExecutionException, InterruptedException, TimeoutException {
String token = String.format(Locale.ROOT, "%s %s", permanentToken.getTokenType(), permanentToken.getAccessToken());
SimpleHttpRequest request = SimpleRequestBuilder
.get(PRODUCTS_URI)

View File

@ -0,0 +1,200 @@
package com.wazuh.contentmanager.cti.console.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.core.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Subscription model for managing CTI subscription data.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Subscription {
public static final String DEVICE_CODE = "device_code";
public static final String CLIENT_ID = "client_id";
public static final String EXPIRES_IN = "expires_in";
public static final String INTERVAL = "interval";
@JsonProperty(DEVICE_CODE)
private String deviceCode;
@JsonProperty(CLIENT_ID)
private String clientId;
@JsonProperty(EXPIRES_IN)
private int expiresIn;
@JsonProperty(INTERVAL)
private int interval;
/**
* Default constructor for frameworks that require a no-arg constructor
*/
public Subscription() { }
/**
* Constructs a Subscription with all fields set.
*
* @param deviceCode the device code returned by the CTI provider
* @param clientId the client identifier associated with the subscription
* @param expiresIn seconds until the device code expires
* @param interval polling interval in seconds to check subscription status
*/
public Subscription(String deviceCode, String clientId, int expiresIn, int interval) {
this.setDeviceCode(deviceCode);
this.setClientId(clientId);
this.setExpiresIn(expiresIn);
this.setInterval(interval);
}
/**
* Parse a {@link Subscription} from the provided {@link org.opensearch.core.xcontent.XContentParser}.
*
* The parser expects the following top-level fields to be present in the
* XContent object: {@code device_code}, {@code client_id}, {@code expires_in}
* and {@code interval}. If any required field is missing an
* {@link IllegalArgumentException} is thrown.
*
* @param parser the XContent parser positioned at the start of an object
* @return a new {@code Subscription} instance populated with parsed values
* @throws IOException if an I/O error occurs while reading from the parser
* @throws IllegalArgumentException if required fields are missing
*/
public static Subscription parse(XContentParser parser) throws IOException {
String deviceCode = null;
String clientId = null;
Integer expiresIn = null;
Integer interval = null;
XContentParser.Token token;
// Move to the next token, which should be the start of the object's fields
while ((token = parser.nextToken()) != null) {
if (token == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
parser.nextToken(); // Move to the value token
switch (fieldName) {
case DEVICE_CODE -> deviceCode = parser.text();
case CLIENT_ID -> clientId = parser.text();
case EXPIRES_IN -> expiresIn = parser.intValue();
case INTERVAL -> interval = parser.intValue();
default -> { /* ignore unknown fields */ }
}
} else if (token == XContentParser.Token.END_OBJECT) {
// Break out once the object is fully parsed
break;
}
}
// Check for missing params
List<String> missingParams = new ArrayList<>();
if (deviceCode == null) {
missingParams.add(DEVICE_CODE);
}
if (clientId == null) {
missingParams.add(CLIENT_ID);
}
if (expiresIn == null) {
missingParams.add(EXPIRES_IN);
}
if (interval == null) {
missingParams.add(INTERVAL);
}
// Throw error if required params are missing.
if (!missingParams.isEmpty()) {
throw new IllegalArgumentException("Missing required parameters: " + missingParams);
}
// Return new instance of Subscription
return new Subscription(deviceCode, clientId, expiresIn, interval);
}
/**
* Returns the device code for the subscription.
*
* @return the device code string, may be null
*/
public String getDeviceCode() {
return this.deviceCode;
}
/**
* Sets the device code for the subscription.
*
* @param deviceCode the device code returned by the CTI provider
*/
public void setDeviceCode(String deviceCode) {
this.deviceCode = deviceCode;
}
/**
* Returns the client identifier associated with this subscription.
*
* @return the client id string, may be null
*/
public String getClientId() {
return this.clientId;
}
/**
* Sets the client identifier for this subscription.
*
* @param clientId the client id to set
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
/**
* Returns the lifetime in seconds until the device code expires.
*
* @return number of seconds until expiration
*/
public int getExpiresIn() {
return this.expiresIn;
}
/**
* Sets the expiration lifetime in seconds for the device code.
*
* @param expiresIn the expiration time in seconds
*/
public void setExpiresIn(int expiresIn) {
this.expiresIn = expiresIn;
}
/**
* Returns the polling interval (in seconds) to check the subscription status.
*
* @return polling interval in seconds
*/
public int getInterval() {
return this.interval;
}
/**
* Sets the polling interval in seconds for subscription status checks.
*
* @param interval polling interval in seconds
*/
public void setInterval(int interval) {
this.interval = interval;
}
/**
* Returns a compact string representation of this Subscription.
*
* @return string representation containing deviceCode, clientId, expiresIn and interval
*/
@Override
public String toString() {
return "{" +
"deviceCode='" + deviceCode + '\'' +
", clientId='" + clientId + '\'' +
", expiresIn=" + expiresIn +
", interval=" + interval +
'}';
}
}

View File

@ -2,20 +2,47 @@ package com.wazuh.contentmanager.cti.console.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import java.io.IOException;
/**
* CTI token DTO.
* Data transfer object that represents an authentication token returned by a
* CTI provider. This class is used to deserialize JSON responses that
* contain an access token and its type (for example, "Bearer").
*
* <p>Instances of this class can be converted to OpenSearch XContent using
* the {@link org.opensearch.core.xcontent.ToXContent} interface implementation.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Token {
@JsonProperty("access_token")
public class Token implements ToXContent {
private static final String ACCESS_TOKEN = "access_token";
private static final String TOKEN_TYPE = "token_type";
@JsonProperty(ACCESS_TOKEN)
private String accessToken;
@JsonProperty(TOKEN_TYPE)
private String tokenType;
/**
* Default constructor.
*/
public Token() { }
/**
* Creates a Token instance with the provided access token and token type.
*
* @param accessToken the access token issued by the CTI provider
* @param tokenType the type of the token (e.g., "Bearer")
*/
public Token(String accessToken, String tokenType) {
this.accessToken = accessToken;
this.tokenType = tokenType;
}
/**
* Getter for accessToken.
* @return Access Token.
@ -24,11 +51,57 @@ public class Token {
return this.accessToken;
}
/**
* Returns the token type (e.g., "Bearer").
*
* @return the token type string, may be null
*/
public String getTokenType() {
return this.tokenType;
}
/**
* Returns a compact string representation of this Token for logging.
*
* @return string representation containing accessToken and tokenType
*/
@Override
public String toString() {
return "Token{" +
"accessToken='" + accessToken + '\'' +
", tokenType='" + tokenType + '\'' +
'}';
}
/**
* Serializes this Token into an {@link XContentBuilder} using JSON format.
*
* @return an {@link XContentBuilder} containing the JSON representation
* of this Token
* @throws IOException if an I/O error occurs while building the content
*/
public XContentBuilder toXContent() throws IOException {
return this.toXContent(XContentFactory.jsonBuilder(), null);
}
/**
* Writes the fields of this Token into the provided {@link XContentBuilder}.
* The resulting structure is a JSON object with the keys {@code access_token}
* and {@code token_type}.
*
* @param builder the XContent builder to write into
* @param params optional parameters (may be ignored)
* @return the same {@link XContentBuilder} instance passed as {@code builder}
* @throws IOException if an I/O error occurs while writing to the builder
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(ACCESS_TOKEN, this.getAccessToken())
.field(TOKEN_TYPE, this.getTokenType())
.endObject();
return builder;
}
}

View File

@ -3,11 +3,12 @@ package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.TokenListener;
import com.wazuh.contentmanager.cti.console.client.ClosableHttpClient;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.cti.console.model.Subscription;
public interface AuthService extends ClosableHttpClient {
Token getToken(String clientId, String deviceCode);
Token getResourceToken(String permanentToken, String resource);
Token getToken(Subscription subscription);
Token getResourceToken(Token token, String resource);
void addListener(TokenListener listener);
}

View File

@ -2,6 +2,7 @@ package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.TokenListener;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.cti.console.model.Subscription;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -30,16 +31,14 @@ public class AuthServiceImpl extends AbstractService implements AuthService {
/**
* Obtains a permanent token for the instance from CTI Console.
* @param clientId unique client identifier for the instance.
* @param deviceCode unique device code provided by the CTI Console during the registration of the instance.
* @param subscription registration details of the instance.
* @return access token.
*/
// TODO replace parameters with SubscriptionModel from https://github.com/wazuh/wazuh-indexer-plugins/pull/662
@Override
public Token getToken(String clientId, String deviceCode) {
public Token getToken(Subscription subscription) {
try {
// Perform request
SimpleHttpResponse response = this.client.getToken(clientId, deviceCode);
SimpleHttpResponse response = this.client.getToken(subscription.getClientId(), subscription.getDeviceCode());
if (response.getCode() == 200) {
// Parse response
@ -66,7 +65,7 @@ public class AuthServiceImpl extends AbstractService implements AuthService {
* @return resource access token
*/
@Override
public Token getResourceToken(String permanentToken, String resource) {
public Token getResourceToken(Token permanentToken, String resource) {
try {
// Perform request
SimpleHttpResponse response = this.client.getResourceToken(permanentToken, resource);

View File

@ -2,10 +2,11 @@ package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.client.ClosableHttpClient;
import com.wazuh.contentmanager.cti.console.model.Plan;
import com.wazuh.contentmanager.cti.console.model.Token;
import java.util.List;
public interface PlansService extends ClosableHttpClient {
List<Plan> getPlans(String permanentToken);
List<Plan> getPlans(Token token);
}

View File

@ -3,6 +3,7 @@ package com.wazuh.contentmanager.cti.console.service;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.wazuh.contentmanager.cti.console.model.Plan;
import com.wazuh.contentmanager.cti.console.model.Token;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -27,13 +28,13 @@ public class PlansServiceImpl extends AbstractService implements PlansService {
/**
* Obtains the list of plans the instance is subscribed to, including all associated products.
* @param permanentToken access token
* @param token permanent token
* @return list of plans the instance has access to.
*/
public List<Plan> getPlans(String permanentToken) {
public List<Plan> getPlans(Token token) {
try {
// Perform request
SimpleHttpResponse response = this.client.getPlans(permanentToken);
SimpleHttpResponse response = this.client.getPlans(token);
if (response.getCode() == 200) {
// Parse response

View File

@ -29,10 +29,8 @@ import org.opensearch.transport.client.Client;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.ToXContent;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
@ -43,7 +41,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.wazuh.contentmanager.model.cti.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.ClusterInfo;

View File

@ -52,9 +52,9 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.wazuh.contentmanager.model.cti.Changes;
import com.wazuh.contentmanager.model.cti.Offset;
import com.wazuh.contentmanager.model.cti.Operation;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.Offset;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.JsonPatch;
import com.wazuh.contentmanager.utils.XContentUtils;

View File

@ -0,0 +1,126 @@
package com.wazuh.contentmanager.rest.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import java.io.IOException;
/**
* General response model for REST API endpoints.
*
* <p>This class provides a standardized format for API responses that include
* a human-readable message and an HTTP status code. It can be serialized to
* OpenSearch XContent via the {@link org.opensearch.core.xcontent.ToXContent}
* interface implementation.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class RestResponse implements ToXContent {
private static final String MESSAGE = "message";
private static final String STATUS = "status";
@JsonProperty(MESSAGE)
private String message;
@JsonProperty(STATUS)
private int status;
/**
* Default constructor for frameworks that require a no-arg constructor
*/
public RestResponse() { }
/**
* Creates an ErrorResponse with the provided message and HTTP status code.
*
* @param message human-readable error message
* @param status HTTP status code representing the error condition
*/
public RestResponse(String message, int status) {
this.message = message;
this.status = status;
}
/**
* Returns the error message.
*
* @return the error message string, may be null
*/
public String getMessage() {
return this.message;
}
/**
* Sets or updates the error message.
*
* @param message the new error message
*/
public void setMessage(String message) {
this.message = message;
}
/**
* Returns the HTTP status code associated with this error.
*
* @return the HTTP status code
*/
public int getStatus() {
return this.status;
}
/**
* Sets the HTTP status code for this error response.
*
* @param status the HTTP status code to set
*/
public void setStatus(int status) {
this.status = status;
}
@Override
/**
* Returns a compact string representation of this ErrorResponse.
*
* @return string representation containing message and status
*/
public String toString() {
return "{" +
"message='" + message + '\'' +
", status=" + status +
'}';
}
/**
* Serializes this RestResponse into an {@link XContentBuilder} using JSON
* format.
*
* @return an {@link XContentBuilder} containing the JSON representation
* of this RestResponse
* @throws IOException if an I/O error occurs while building the content
*/
public XContentBuilder toXContent() throws IOException {
return this.toXContent(XContentFactory.jsonBuilder(), null);
}
/**
* Writes the fields of this RestResponse into the provided
* {@link XContentBuilder}. The resulting structure is a JSON object with
* the keys {@code message} and {@code status}.
*
* @param builder the XContent builder to write into
* @param params optional parameters (may be ignored)
* @return the same {@link XContentBuilder} instance passed as {@code builder}
* @throws IOException if an I/O error occurs while writing to the builder
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(MESSAGE, this.getMessage())
.field(STATUS, this.getStatus())
.endObject();
return builder;
}
}

View File

@ -0,0 +1,114 @@
package com.wazuh.contentmanager.rest.services;
import com.wazuh.contentmanager.ContentManagerPlugin;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.rest.model.RestResponse;
import org.opensearch.transport.client.node.NodeClient;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.NamedRoute;
import org.opensearch.rest.RestRequest;
import java.io.IOException;
import java.util.List;
import static org.opensearch.rest.RestRequest.Method.DELETE;
/**
* DELETE /_plugins/content-manager/subscription
*
* Deletes the current CTI subscription.
*
* Possible HTTP responses:
* - 200 OK: Subscription successfully deleted
* - 404 Not Found: No subscription exists to delete
* - 401 Unauthorized: The endpoint is being accessed by a different user, the expected user is wazuh-dashboard
* - 500 Internal Server Error: Unexpected error during processing
*/
public class RestDeleteSubscriptionAction extends BaseRestHandler {
private static final String ENDPOINT_NAME = "content_manager_subscription_delete";
private static final String ENDPOINT_UNIQUE_NAME = "plugin:content_manager/subscription_delete";
private final CtiConsole ctiConsole;
/**
* Create a new REST action.
*
* @param ctiConsole the CTI console used to access and delete subscription tokens
*/
public RestDeleteSubscriptionAction( CtiConsole ctiConsole) {
this.ctiConsole = ctiConsole;
}
/**
* Return a short identifier for this handler.
*
* @return a short identifier for this handler
*/
@Override
public String getName() { return ENDPOINT_NAME; }
/**
* Define the routes handled by this action.
*
* @return the list of routes exposed by this handler (DELETE subscription)
*/
@Override
public List<Route> routes() {
return List.of(
new NamedRoute.Builder()
.path(ContentManagerPlugin.SUBSCRIPTION_URI)
.method(DELETE)
.uniqueName(ENDPOINT_UNIQUE_NAME)
.build()
);
}
/**
* Prepare the request by returning a channel consumer that executes the
* deletion and sends the corresponding response. This endpoint ignores
* request body and query parameters.
*
* @param request the incoming REST request
* @param client the node client (unused)
* @return a consumer that will be executed to produce a response
*/
@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
return channel -> {
channel.sendResponse(this.handleRequest());
};
}
/**
* Execute the delete-subscription operation.
*
*
* @return a {@link BytesRestResponse} representing the HTTP response
* @throws IOException propagated if an I/O error occurs while building the response
*/
public BytesRestResponse handleRequest() throws IOException {
try {
Token token = this.ctiConsole.getToken();
if (token == null) {
RestResponse error = new RestResponse(
"Token not found",
RestStatus.NOT_FOUND.getStatus()
);
return new BytesRestResponse(RestStatus.NOT_FOUND, error.toXContent());
}
this.ctiConsole.deleteToken();
RestResponse response = new RestResponse("Subscription deleted successfully", RestStatus.OK.getStatus());
return new BytesRestResponse(RestStatus.OK, response.toXContent());
} catch (Exception e) {
RestResponse error = new RestResponse(
e.getMessage() != null ? e.getMessage() : "An unexpected error occurred while processing your request.",
RestStatus.INTERNAL_SERVER_ERROR.getStatus()
);
return new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, error.toXContent());
}
}
}

View File

@ -0,0 +1,109 @@
package com.wazuh.contentmanager.rest.services;
import com.wazuh.contentmanager.ContentManagerPlugin;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.rest.model.RestResponse;
import org.opensearch.transport.client.node.NodeClient;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.NamedRoute;
import org.opensearch.rest.RestRequest;
import java.io.IOException;
import java.util.List;
import static org.opensearch.rest.RestRequest.Method.GET;
/**
* GET /_plugins/content-manager/subscription
*
* Retrieves the current CTI subscription token.
*
* Possible HTTP responses:
* - 200 OK: Subscription found, returns access token and token type
* - 404 Not Found: The token does not exist
* - 401 Unauthorized: The endpoint is being accessed by a different user, the expected user is wazuh-server
* - 500 Internal Server Error: Unexpected error during processing
*/
public class RestGetSubscriptionAction extends BaseRestHandler {
private static final String ENDPOINT_NAME = "content_manager_subscription_get";
private static final String ENDPOINT_UNIQUE_NAME = "plugin:content_manager/subscription_get";
private final CtiConsole ctiConsole;
/**
* Construct the REST handler.
*
* @param console the CTI console used to retrieve the token
*/
public RestGetSubscriptionAction(CtiConsole console) {
this.ctiConsole = console;
}
/**
* Return a short name identifying this handler.
*
* @return a short name identifying this handler
*/
@Override
public String getName() { return ENDPOINT_NAME; }
/**
* Return the route configuration for this handler.
*
* @return the route configuration for this handler
*/
@Override
public List<Route> routes() {
return List.of(
new NamedRoute.Builder()
.path(ContentManagerPlugin.SUBSCRIPTION_URI)
.method(GET)
.uniqueName(ENDPOINT_UNIQUE_NAME)
.build()
);
}
/**
* Prepare the request by returning a consumer that executes the lookup
* and sends the appropriate response. Query parameters and request body
* are ignored for this endpoint.
*
* @param request the incoming REST request
* @param client the node client (unused)
* @return a RestChannelConsumer that produces the response
*/
@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
return channel -> {
channel.sendResponse(this.handleRequest());
};
}
/**
* Execute the get-subscription operation.
*
* @return a BytesRestResponse containing the token information or error
* @throws IOException if an I/O error occurs while building the response
*/
public BytesRestResponse handleRequest() throws IOException {
try {
Token token = this.ctiConsole.getToken();
if (token == null) {
RestResponse error = new RestResponse(
"Token not found",
RestStatus.NOT_FOUND.getStatus()
);
return new BytesRestResponse(RestStatus.NOT_FOUND, error.toXContent());
}
return new BytesRestResponse(RestStatus.OK, token.toXContent());
} catch (Exception e) {
RestResponse error = new RestResponse(
e.getMessage() != null ? e.getMessage() : "An unexpected error occurred while processing your request.",
RestStatus.INTERNAL_SERVER_ERROR.getStatus()
);
return new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, error.toXContent());
}
}
}

View File

@ -0,0 +1,118 @@
package com.wazuh.contentmanager.rest.services;
import com.wazuh.contentmanager.ContentManagerPlugin;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.cti.console.model.Subscription;
import com.wazuh.contentmanager.rest.model.RestResponse;
import org.opensearch.transport.client.node.NodeClient;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.NamedRoute;
import org.opensearch.rest.RestRequest;
import java.io.IOException;
import java.util.List;
import static org.opensearch.rest.RestRequest.Method.POST;
/**
* POST /_plugins/content-manager/subscription
*
* Creates or updates the CTI subscription.
*
* Possible HTTP responses:
* - 201 Created: Subscription successfully created or updated
* - 400 Bad Request: Missing required parameters (device_code, client_id, expires_in, interval)
* - 401 Unauthorized: The endpoint is being accessed by a different user, the expected user is wazuh-dashboard
* - 500 Internal Server Error: Unexpected error during processing
*/
public class RestPostSubscriptionAction extends BaseRestHandler {
private static final String ENDPOINT_NAME = "content_manager_subscription_post";
private static final String ENDPOINT_UNIQUE_NAME = "plugin:content_manager/subscription_post";
private final CtiConsole ctiConsole;
/**
* Construct the REST handler.
*
* @param console the CTI console used to handle subscription requests
*/
public RestPostSubscriptionAction(CtiConsole console) {
this.ctiConsole = console;
}
/**
* Return a short name identifying this handler.
*
* @return a short name identifying this handler
*/
@Override
public String getName() { return ENDPOINT_NAME; }
/**
* Return the route configuration for this handler.
*
* @return route configuration for POST subscription
*/
@Override
public List<Route> routes() {
return List.of(
new NamedRoute.Builder()
.path(ContentManagerPlugin.SUBSCRIPTION_URI)
.method(POST)
.uniqueName(ENDPOINT_UNIQUE_NAME)
.build()
);
}
/**
* Prepare the request by parsing the incoming subscription payload and
* returning a consumer that forwards the parsed DTO to {@link #handleRequest}.
*
* @param request the incoming REST request containing the subscription payload
* @param client the node client (unused)
* @return a RestChannelConsumer that processes the request and sends the response
*/
@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
return channel -> {
// Parse subscription details and create a new instance of Subscription DTO.
Subscription subscription = Subscription.parse(request.contentParser());
// Send response from handleRequest method which process the request.
channel.sendResponse(this.handleRequest(subscription));
};
}
/**
* Handle the subscription creation/update.
*
*
* @param subscription the parsed subscription DTO
* @return a BytesRestResponse representing the operation result
* @throws IOException if an I/O error occurs while building the response
*/
public BytesRestResponse handleRequest(Subscription subscription) throws IOException {
try {
// Notify CTI Console about a registration request
this.ctiConsole.onPostSubscriptionRequest(subscription);
// Return success
RestResponse response = new RestResponse("Subscription created successfully", RestStatus.CREATED.getStatus());
return new BytesRestResponse(RestStatus.CREATED, response.toXContent());
} catch (IllegalArgumentException e) {
RestResponse error = new RestResponse(
e.getMessage(),
RestStatus.BAD_REQUEST.getStatus()
);
return new BytesRestResponse(RestStatus.BAD_REQUEST, error.toXContent());
}
catch (Exception e) {
RestResponse error = new RestResponse(
e.getMessage() != null ? e.getMessage() : "An unexpected error occurred while processing your request.",
RestStatus.INTERNAL_SERVER_ERROR.getStatus()
);
return new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, error.toXContent());
}
}
}

View File

@ -0,0 +1,133 @@
package com.wazuh.contentmanager.rest.services;
import com.wazuh.contentmanager.ContentManagerPlugin;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.cti.console.model.Subscription;
import com.wazuh.contentmanager.rest.model.RestResponse;
import org.opensearch.transport.client.node.NodeClient;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.NamedRoute;
import org.opensearch.rest.RestRequest;
import java.io.IOException;
import java.util.List;
import static org.opensearch.rest.RestRequest.Method.POST;
/**
* POST /_plugins/content-manager/update
*
* Triggers a CTI content update operation.
*
* Possible HTTP responses:
* - 202 Accepted: Update operation accepted and started
* - 404 Not Found: No subscription exists (subscription required before updating)
* - 401 Unauthorized: The endpoint is being accessed by a different user, the expected user is wazuh-server
* - 409 Conflict: Another update operation is already in progress
* - 429 Too Many Requests: Rate limit exceeded
* - 500 Internal Server Error: Unexpected error during processing
*
* Response headers (for rate limiting):
* - X-RateLimit-Limit: Maximum number of requests allowed per hour
* - X-RateLimit-Remaining: Number of requests remaining in current window
* - X-RateLimit-Reset: Unix timestamp when the rate limit window resets
*/
public class RestPostUpdateAction extends BaseRestHandler {
private static final String ENDPOINT_NAME = "content_manager_subscription_update";
private static final String ENDPOINT_UNIQUE_NAME = "plugin:content_manager/subscription_update";
private final CtiConsole ctiConsole;
/**
* Construct the update REST handler.
*
* @param console the CTI console used to check subscription state and trigger updates
*/
public RestPostUpdateAction(CtiConsole console) {
this.ctiConsole = console;
}
/**
* Return a short identifier for this handler.
*/
@Override
public String getName() { return ENDPOINT_NAME; }
/**
* Return the route configuration for this handler.
*
* @return route configuration for the update endpoint
*/
@Override
public List<Route> routes() {
return List.of(
// POST /_plugins/content-manager/update
new NamedRoute.Builder()
.path(ContentManagerPlugin.UPDATE_URI)
.method(POST)
.uniqueName(ENDPOINT_UNIQUE_NAME)
.build()
);
}
/**
* Prepare the request by returning a consumer that executes the update operation.
*
* @param request the incoming REST request
* @param client the node client
* @return a consumer that executes the update operation
*/
@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
return channel -> {
channel.sendResponse(this.handleRequest());
};
}
/**
* Execute the update operation.
*
* @return a BytesRestResponse describing the outcome
* @throws IOException if an I/O error occurs while building the response
*/
public BytesRestResponse handleRequest() throws IOException {
try {
// 1. Check if Token exists (404 Not Found)
if (this.ctiConsole.getToken() == null) {
RestResponse error = new RestResponse(
"Token not found. Please create a subscription before attempting to update.",
RestStatus.NOT_FOUND.getStatus()
);
return new BytesRestResponse(RestStatus.NOT_FOUND, error.toXContent());
}
// 2. Conflict Check (409 Conflict)
// TODO: Implement actual concurrency control
if (1 == 2) {
RestResponse error = new RestResponse(
"An update operation is already in progress. Please wait for it to complete.",
RestStatus.CONFLICT.getStatus()
);
return new BytesRestResponse(RestStatus.CONFLICT, error.toXContent());
}
// 3. Rate Limit Check (429 Too Many Requests)
/**
* - X-RateLimit-Limit: Maximum number of requests allowed per hour
* - X-RateLimit-Remaining: Number of requests remaining in current window
* - X-RateLimit-Reset: Unix timestamp when the rate limit window resets
*/
// TODO: Add actual update logic
RestResponse response = new RestResponse("Update accepted", RestStatus.ACCEPTED.getStatus());
return new BytesRestResponse(RestStatus.ACCEPTED, response.toXContent());
} catch (Exception e) {
RestResponse error = new RestResponse(
e.getMessage() != null ? e.getMessage() : "An unexpected error occurred while processing your request.",
RestStatus.INTERNAL_SERVER_ERROR.getStatus()
);
return new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, error.toXContent());
}
}
}

View File

@ -22,8 +22,8 @@ import org.apache.logging.log4j.Logger;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.model.cti.Changes;
import com.wazuh.contentmanager.model.cti.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import com.wazuh.contentmanager.utils.VisibleForTesting;

View File

@ -21,7 +21,7 @@ import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.wazuh.contentmanager.model.cti.Operation;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
/**
* Utility class for applying JSON Patch operations to JSON documents.

View File

@ -19,8 +19,7 @@ package com.wazuh.contentmanager.utils;
import java.security.AccessController;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.model.cti.Changes;
import com.wazuh.contentmanager.model.cti.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
/** Privileged utility class for executing privileged HTTP requests. */
public class Privileged {

View File

@ -30,7 +30,7 @@ import java.util.concurrent.Semaphore;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.model.cti.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.updater.ContentUpdater;

View File

@ -33,9 +33,9 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import com.wazuh.contentmanager.model.cti.Changes;
import com.wazuh.contentmanager.model.cti.ConsumerInfo;
import com.wazuh.contentmanager.model.cti.Offset;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.Offset;
import com.wazuh.contentmanager.settings.PluginSettings;
import org.mockito.InjectMocks;
import org.mockito.Mock;

View File

@ -6,6 +6,7 @@ import com.wazuh.contentmanager.cti.console.service.AuthService;
import com.wazuh.contentmanager.cti.console.service.AuthServiceImpl;
import com.wazuh.contentmanager.cti.console.service.PlansService;
import com.wazuh.contentmanager.cti.console.service.PlansServiceImpl;
import com.wazuh.contentmanager.cti.console.model.Subscription;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.junit.After;
@ -69,7 +70,8 @@ public class CtiConsoleTests extends OpenSearchTestCase {
when(this.mockClient.getToken(anyString(), anyString()))
.thenReturn(SimpleHttpResponse.create(200, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
Token tokenA = this.authService.getToken("anyClientID", "anyDeviceCode");
Subscription subscription = new Subscription("anyClientID", "anyDeviceCode", 3600, 5);
Token tokenA = this.authService.getToken(subscription);
// Ensure onTokenChanged is invoked, and the token in the CtiConsole instance is updated.
Token tokenB = this.console.getToken();
@ -96,7 +98,8 @@ public class CtiConsoleTests extends OpenSearchTestCase {
.thenReturn(httpResponsePending, httpResponsePending, httpResponsePending, httpResponse);
// Start polling
this.console.onPostSubscriptionRequest();
Subscription subscription = new Subscription("anyClientID", "anyDeviceCode", 3600, 5);
this.console.onPostSubscriptionRequest(subscription);
// Wait for the token with a timeout
Token token = this.console.waitForToken();

View File

@ -2,11 +2,11 @@ package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.client.ApiClient;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.cti.console.model.Subscription;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opensearch.test.OpenSearchTestCase;
@ -60,7 +60,8 @@ public class AuthServiceTests extends OpenSearchTestCase {
when(this.mockClient.getToken(anyString(), anyString()))
.thenReturn(SimpleHttpResponse.create(200, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
Token token = this.authService.getToken("anyClientID", "anyDeviceCode");
Subscription subscription = new Subscription("anyClientID", "anyDeviceCode", 3600, 5);
Token token = this.authService.getToken(subscription);
// Token must not be null
assertNotNull(token);
@ -83,16 +84,17 @@ public class AuthServiceTests extends OpenSearchTestCase {
public void testGetTokenFailure() throws ExecutionException, InterruptedException, TimeoutException {
Token token;
String response = "{\"error\": \"invalid_request\", \"error_description\": \"Missing or invalid parameter: client_id\"}";
Subscription subscription = new Subscription("anyClientID", "anyDeviceCode", 3600, 5);
// When CTI replies with an error code, token must be null. No exception raised
when(this.mockClient.getToken(anyString(), anyString()))
.thenReturn(SimpleHttpResponse.create(400, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
token = this.authService.getToken("anyClientID", "anyDeviceCode");
token = this.authService.getToken(subscription);
assertNull(token);
// When CTI does not reply, token must be null and exceptions are raised.
when(this.mockClient.getToken(anyString(), anyString())).thenThrow(ExecutionException.class);
token = this.authService.getToken("anyClientID", "anyDeviceCode");
token = this.authService.getToken(subscription);
assertNull(token);
}
@ -109,10 +111,10 @@ public class AuthServiceTests extends OpenSearchTestCase {
public void testGetResourceTokenSuccess() throws ExecutionException, InterruptedException, TimeoutException {
// Mock client response upon request
String response = "{\"access_token\": \"https://localhost:8443/api/v1/catalog/contexts/misp/consumers/virustotal/changes?from_offset=0&to_offset=1000&with_empties=true&verify=1761383411-kJ9b8w%2BQ7kzRmF\", \"issued_token_type\": \"urn:wazuh:params:oauth:token-type:signed_url\", \"expires_in\": 300}";
when(this.mockClient.getResourceToken(anyString(), anyString()))
when(this.mockClient.getResourceToken(any(Token.class), anyString()))
.thenReturn(SimpleHttpResponse.create(200, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
Token token = this.authService.getResourceToken("anyToken", "anyResource");
Token token = this.authService.getResourceToken(new Token("anyToken", "Bearer"), "anyResource");
// Token must not be null
assertNotNull(token);
@ -137,14 +139,14 @@ public class AuthServiceTests extends OpenSearchTestCase {
String response = "{\"error\": \"invalid_target\", \"error_description\": \"The resource parameter refers to an invalid endpoint\"}";
// When CTI replies with an error code, token must be null. No exception raised
when(this.mockClient.getResourceToken(anyString(), anyString()))
when(this.mockClient.getResourceToken(any(Token.class), anyString()))
.thenReturn(SimpleHttpResponse.create(400, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
token = this.authService.getResourceToken("anyToken", "anyResource");
token = this.authService.getResourceToken(new Token("anyToken", "Bearer"), "anyResource");
assertNull(token);
// When CTI does not reply, token must be null and exceptions are raised.
when(this.mockClient.getResourceToken(anyString(), anyString())).thenThrow(ExecutionException.class);
token = this.authService.getResourceToken("anyToken", "anyResource");
when(this.mockClient.getResourceToken(any(Token.class), anyString())).thenThrow(ExecutionException.class);
token = this.authService.getResourceToken(new Token("anyToken", "Bearer"), "anyResource");
assertNull(token);
}
}

View File

@ -2,11 +2,11 @@ package com.wazuh.contentmanager.cti.console.service;
import com.wazuh.contentmanager.cti.console.client.ApiClient;
import com.wazuh.contentmanager.cti.console.model.Plan;
import com.wazuh.contentmanager.cti.console.model.Token;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opensearch.test.OpenSearchTestCase;
@ -96,10 +96,10 @@ public class PlansServiceTests extends OpenSearchTestCase {
]
}
}""";
when(this.mockClient.getPlans(anyString()))
when(this.mockClient.getPlans(any(Token.class)))
.thenReturn(SimpleHttpResponse.create(200, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
List<Plan> plans = this.plansService.getPlans("anyToken");
List<Plan> plans = this.plansService.getPlans(new Token("anyToken", "Bearer"));
// plans must not be null, or empty
assertNotNull(plans);
@ -124,14 +124,14 @@ public class PlansServiceTests extends OpenSearchTestCase {
String response = "{\"error\": \"unauthorized_client\", \"error_description\": \"The provided token is invalid or expired\"}";
// When CTI replies with an error code, token must be null. No exception raised
when(this.mockClient.getPlans(anyString()))
when(this.mockClient.getPlans(any(Token.class)))
.thenReturn(SimpleHttpResponse.create(400, response.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
plans = this.plansService.getPlans("anyToken");
plans = this.plansService.getPlans(new Token("anyToken", "Bearer"));
assertNull(plans);
// When CTI does not reply, token must be null and exceptions are raised.
when(this.mockClient.getPlans(anyString())).thenThrow(ExecutionException.class);
plans = this.plansService.getPlans("anyToken");
when(this.mockClient.getPlans(any(Token.class))).thenThrow(ExecutionException.class);
plans = this.plansService.getPlans(new Token("anyToken", "Bearer"));
assertNull(plans);
}
}

View File

@ -27,9 +27,9 @@ import org.junit.Before;
import java.util.List;
import com.wazuh.contentmanager.model.cti.Changes;
import com.wazuh.contentmanager.model.cti.Offset;
import com.wazuh.contentmanager.model.cti.Operation;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.Offset;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
import com.wazuh.contentmanager.settings.PluginSettings;
import org.mockito.InjectMocks;
import org.mockito.Mock;

View File

@ -0,0 +1,72 @@
package com.wazuh.contentmanager.rest;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.rest.model.RestResponse;
import com.wazuh.contentmanager.rest.services.RestDeleteSubscriptionAction;
import org.junit.Before;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.core.rest.RestStatus;
import java.io.IOException;
import static org.mockito.Mockito.*;
public class RestDeleteSubscriptionActionTests extends OpenSearchTestCase {
private CtiConsole console;
private RestDeleteSubscriptionAction action;
/**
* Set up the tests
*
* @throws Exception rethrown from parent method
*/
@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.console = mock(CtiConsole.class);
this.action = new RestDeleteSubscriptionAction(this.console);
}
/** Test the {@link RestDeleteSubscriptionAction#handleRequest()} method when the token is created (mock).
* The expected response is: {200, RestResponse}
*/
public void testDeleteToken200() throws IOException {
// Mock
Token token = new Token("test_token", "test_type");
when(this.console.getToken()).thenReturn(token);
// Act
BytesRestResponse bytesRestResponse = this.action.handleRequest();
// Expected response
RestResponse expectedResponse = new RestResponse("Subscription deleted successfully", RestStatus.OK.getStatus());
// Assert
assertTrue(bytesRestResponse.content().utf8ToString().contains(expectedResponse.getMessage()));
assertTrue(bytesRestResponse.content().utf8ToString().contains(String.valueOf(expectedResponse.getStatus())));
assertEquals(RestStatus.OK, bytesRestResponse.status());
}
/** Test the {@link RestDeleteSubscriptionAction#handleRequest()} method when the token has not been created (mock).
* The expected response is: {404, RestResponse}
*/
public void testDeleteToken404() throws IOException {
// Mock
when(this.console.getToken()).thenReturn(null);
// Act
BytesRestResponse bytesRestResponse = this.action.handleRequest();
// Expected response
RestResponse expectedResponse = new RestResponse("Token not found", RestStatus.NOT_FOUND.getStatus());
// Assert
assertTrue(bytesRestResponse.content().utf8ToString().contains(expectedResponse.getMessage()));
assertTrue(bytesRestResponse.content().utf8ToString().contains(String.valueOf(expectedResponse.getStatus())));
assertEquals(RestStatus.NOT_FOUND, bytesRestResponse.status());
}
}

View File

@ -0,0 +1,70 @@
package com.wazuh.contentmanager.rest;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.rest.model.RestResponse;
import com.wazuh.contentmanager.rest.services.RestGetSubscriptionAction;
import org.junit.Before;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.core.rest.RestStatus;
import java.io.IOException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RestGetSubscriptionActionTests extends OpenSearchTestCase {
private CtiConsole console;
private RestGetSubscriptionAction action;
/**
* Set up the tests
*
* @throws Exception rethrown from parent method
*/
@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.console = mock(CtiConsole.class);
this.action = new RestGetSubscriptionAction(this.console);
}
/** Test the {@link RestGetSubscriptionAction#handleRequest()} method when the token is created (mock).
* The expected response is: {200, Token}
*/
public void testGetToken200() throws IOException {
// Mock
Token token = new Token("test_token", "test_type");
when(this.console.getToken()).thenReturn(token);
// Act
BytesRestResponse bytesRestResponse = this.action.handleRequest();
// Assert
assertTrue(bytesRestResponse.content().utf8ToString().contains(token.getAccessToken()));
assertTrue(bytesRestResponse.content().utf8ToString().contains(token.getTokenType()));
assertEquals(RestStatus.OK, bytesRestResponse.status());
}
/** Test the {@link RestGetSubscriptionAction#handleRequest()} method when the token has not been created (mock).
* The expected response is: {404, RestResponse}
*/
public void testGetToken404() throws IOException {
// Mock
when(this.console.getToken()).thenReturn(null);
// Act
BytesRestResponse bytesRestResponse = this.action.handleRequest();
// Expected response
RestResponse expectedResponse = new RestResponse("Token not found", RestStatus.NOT_FOUND.getStatus());
// Assert
assertTrue(bytesRestResponse.content().utf8ToString().contains(expectedResponse.getMessage()));
assertTrue(bytesRestResponse.content().utf8ToString().contains(String.valueOf(expectedResponse.getStatus())));
assertEquals(RestStatus.NOT_FOUND, bytesRestResponse.status());
}
}

View File

@ -0,0 +1,73 @@
package com.wazuh.contentmanager.rest;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.cti.console.model.Subscription;
import com.wazuh.contentmanager.rest.model.RestResponse;
import com.wazuh.contentmanager.rest.services.RestPostSubscriptionAction;
import org.junit.Before;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.core.rest.RestStatus;
import java.io.IOException;
import static org.mockito.Mockito.*;
public class RestPostSubscriptionActionTests extends OpenSearchTestCase {
private CtiConsole console;
private RestPostSubscriptionAction action;
/**
* Set up the tests
*
* @throws Exception rethrown from parent method
*/
@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.console = mock(CtiConsole.class);
this.action = new RestPostSubscriptionAction(this.console);
}
/** Test the {@link RestPostSubscriptionAction#handleRequest(Subscription)} method when the request is complete.
* The expected response is: {201, RestResponse}
*/
public void testPostToken201() throws IOException {
// Mock
Subscription subscription = new Subscription();
//Act
BytesRestResponse bytesRestResponse = this.action.handleRequest(subscription);
// Expected response
RestResponse expectedResponse = new RestResponse("Subscription created successfully", RestStatus.CREATED.getStatus());
// Assert
assertTrue(bytesRestResponse.content().utf8ToString().contains(expectedResponse.getMessage()));
assertTrue(bytesRestResponse.content().utf8ToString().contains(String.valueOf(expectedResponse.getStatus())));
assertEquals(RestStatus.CREATED, bytesRestResponse.status());
}
/** Test the {@link RestPostSubscriptionAction#handleRequest(Subscription)} method when the token has not been created (mock).
* The expected response is: {400, RestResponse}
*/
public void testPostToken400() throws IOException {
// Mock
Subscription subscription = new Subscription();
doThrow(new IllegalArgumentException("Missing required parameters")).when(this.console).onPostSubscriptionRequest(subscription);
//Act
BytesRestResponse bytesRestResponse = this.action.handleRequest(subscription);
// Expected response
RestResponse expectedResponse = new RestResponse("Missing required parameters", RestStatus.BAD_REQUEST.getStatus());
// Assert
assertTrue(bytesRestResponse.content().utf8ToString().contains(expectedResponse.getMessage()));
assertTrue(bytesRestResponse.content().utf8ToString().contains(String.valueOf(expectedResponse.getStatus())));
assertEquals(RestStatus.BAD_REQUEST, bytesRestResponse.status());
}
}

View File

@ -0,0 +1,87 @@
package com.wazuh.contentmanager.rest;
import com.wazuh.contentmanager.cti.console.CtiConsole;
import com.wazuh.contentmanager.cti.console.model.Token;
import com.wazuh.contentmanager.rest.model.RestResponse;
import com.wazuh.contentmanager.rest.services.RestPostUpdateAction;
import org.junit.Before;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.core.rest.RestStatus;
import java.io.IOException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RestPostUpdateActionTests extends OpenSearchTestCase {
private CtiConsole console;
private RestPostUpdateAction action;
/**
* Set up the tests
*
* @throws Exception rethrown from parent method
*/
@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.console = mock(CtiConsole.class);
this.action = new RestPostUpdateAction(this.console);
}
/** Test the {@link RestPostUpdateAction#handleRequest()} method when the token is created (mock).
* The expected response is: {200, Token}
*/
public void testGetToken202() throws IOException {
// Mock
Token token = new Token("test_token", "test_type");
when(this.console.getToken()).thenReturn(token);
// Act
BytesRestResponse bytesRestResponse = this.action.handleRequest();
// Expected response
RestResponse expectedResponse = new RestResponse("Update accepted", RestStatus.ACCEPTED.getStatus());
// Assert
assertTrue(bytesRestResponse.content().utf8ToString().contains(expectedResponse.getMessage()));
assertTrue(bytesRestResponse.content().utf8ToString().contains(String.valueOf(expectedResponse.getStatus())));
assertEquals(RestStatus.ACCEPTED, bytesRestResponse.status());
}
/** Test the {@link RestPostUpdateAction#handleRequest()} method when the token has not been created (mock).
* The expected response is: {404, RestResponse}
*/
public void testGetToken404() throws IOException {
// Mock
when(this.console.getToken()).thenReturn(null);
// Act
BytesRestResponse bytesRestResponse = this.action.handleRequest();
// Expected response
RestResponse expectedResponse = new RestResponse("Token not found", RestStatus.NOT_FOUND.getStatus());
// Assert
assertTrue(bytesRestResponse.content().utf8ToString().contains(expectedResponse.getMessage()));
assertTrue(bytesRestResponse.content().utf8ToString().contains(String.valueOf(expectedResponse.getStatus())));
assertEquals(RestStatus.NOT_FOUND, bytesRestResponse.status());
}
/** Test the {@link RestPostUpdateAction#handleRequest()} method when there is already a request being performed.
* The expected response is: {409, RestResponse}
*/
public void testGetToken409() throws IOException {
// TODO
}
/** Test the {@link RestPostUpdateAction#handleRequest()} method when the rate limit is exceeded.
* The expected response is: {429, RestResponse}
*/
public void testGetToken429() throws IOException {
// TODO
}
}

View File

@ -18,6 +18,10 @@ package com.wazuh.contentmanager.updater;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.Offset;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.transport.client.Client;
@ -44,7 +48,6 @@ import com.wazuh.contentmanager.ContentManagerPlugin;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.model.cti.*;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import org.mockito.InjectMocks;

View File

@ -16,6 +16,10 @@
*/
package com.wazuh.contentmanager.updater;
import com.wazuh.contentmanager.cti.catalog.model.Changes;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.Offset;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
@ -28,7 +32,6 @@ import java.util.List;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.model.cti.*;
import com.wazuh.contentmanager.settings.PluginSettings;
import com.wazuh.contentmanager.utils.Privileged;
import org.mockito.InjectMocks;

View File

@ -22,7 +22,7 @@ import org.opensearch.test.OpenSearchTestCase;
import org.junit.After;
import org.junit.Before;
import com.wazuh.contentmanager.model.cti.Operation;
import com.wazuh.contentmanager.cti.catalog.model.Operation;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
public class JsonPatchTests extends OpenSearchTestCase {

View File

@ -32,7 +32,7 @@ import java.util.Iterator;
import com.wazuh.contentmanager.client.CTIClient;
import com.wazuh.contentmanager.index.ContentIndex;
import com.wazuh.contentmanager.index.ConsumersIndex;
import com.wazuh.contentmanager.model.cti.ConsumerInfo;
import com.wazuh.contentmanager.cti.catalog.model.ConsumerInfo;
import com.wazuh.contentmanager.settings.PluginSettings;
import org.mockito.InjectMocks;
import org.mockito.Mock;