Add tool for pulling indexes and sourcetypes

This commit is contained in:
Cameron Schmidt
2025-03-21 20:52:24 -04:00
parent 2b7b0104c2
commit 346f8e383e

View File

@@ -1,4 +1,5 @@
import logging
import json
from typing import Optional, List, Dict, Any
from mcp.server.fastmcp import FastMCP # Updated import path
from decouple import config
@@ -61,7 +62,9 @@ def get_splunk_connection():
username=SPLUNK_USERNAME,
password=SPLUNK_PASSWORD,
scheme=SPLUNK_SCHEME,
ssl_context=ssl_context
ssl_context=ssl_context,
owner="-",
app="-"
)
logger.info("✅ Successfully established Splunk connection")
return service
@@ -123,7 +126,6 @@ async def search_splunk(
result_stream = job.results(output_mode='json', count=max_results)
# Parse the JSON response
import json
response_data = json.loads(result_stream.read().decode('utf-8'))
if 'results' in response_data:
@@ -136,7 +138,29 @@ async def search_splunk(
logger.error(f"❌ Error executing Splunk search: {str(e)}")
raise
@mcp.tool()
async def list_saved_searches() -> List[Dict[str, Any]]:
"""
List all saved searches. This can be useful for example search queries for unknown indexes/sourcetypes.
Returns:
List of dictionaries containing saved search information
"""
try:
service = get_splunk_connection()
saved_searches = []
for search in service.saved_searches:
saved_searches.append({
"name": search.name,
"description": search.description,
"search": search.search
})
return saved_searches
except Exception as e:
logger.error(f"❌ Error listing saved searches: {str(e)}")
raise
@mcp.tool()
async def get_index_metadata(index_name: str) -> Dict[str, Any]:
@@ -180,6 +204,49 @@ async def get_index_metadata(index_name: str) -> Dict[str, Any]:
logger.error(f"❌ Error listing indexes: {str(e)}")
raise
@mcp.tool()
async def get_indexes_and_sourcetypes() -> Dict[str, Any]:
"""
Get all Splunk indexes and their associated sourcetypes.
Returns:
List of dictionaries containing event counts by index and sourcetype.
"""
try:
service = get_splunk_connection()
# Create the search job
kwargs_search = {
"earliest_time": "-24h",
"latest_time": "now",
"preview": False,
"exec_mode": "blocking" # Make the search synchronous
}
search_query = "| tstats count AS event_count WHERE (index!=_internal OR index=_internal) by index, sourcetype"
logger.info("🔍 Executing Splunk search to get indexes and sourcetypes")
job = service.jobs.create(search_query, **kwargs_search)
# Get the results
results_list = []
# Get all results at once in JSON format
result_stream = job.results(output_mode='json', count=0)
# Parse the JSON response
response_data = json.loads(result_stream.read().decode('utf-8'))
if 'results' in response_data:
results_list = response_data['results']
logger.info(f"✅ Search completed. Found {len(results_list)} results")
return results_list
except Exception as e:
logger.error(f"❌ Error executing Splunk search: {str(e)}")
raise
@mcp.tool()
async def list_users() -> List[Dict[str, Any]]: # Made async
"""
@@ -324,4 +391,4 @@ if __name__ == "__main__":
else:
logger.info("🚀 Starting Splunk MCP server in STDIO mode")
mcp.run(transport=transport_mode)
mcp.run(transport=transport_mode)