326 lines
11 KiB
Python
326 lines
11 KiB
Python
import logging
|
|
from typing import Optional, List, Dict, Any
|
|
from mcp.server.fastmcp import FastMCP # Updated import path
|
|
from decouple import config
|
|
import splunklib.client as client
|
|
from splunklib import results
|
|
from datetime import datetime, timedelta
|
|
import sys
|
|
import ssl
|
|
import socket
|
|
import traceback
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize the MCP server
|
|
mcp = FastMCP("splunk")
|
|
|
|
# Splunk connection configuration
|
|
SPLUNK_HOST = config("SPLUNK_HOST", default="localhost")
|
|
SPLUNK_PORT = config("SPLUNK_PORT", default=8089, cast=int)
|
|
SPLUNK_USERNAME = config("SPLUNK_USERNAME", default="admin")
|
|
SPLUNK_PASSWORD = config("SPLUNK_PASSWORD")
|
|
SPLUNK_SCHEME = config("SPLUNK_SCHEME", default="https")
|
|
VERIFY_SSL = config("VERIFY_SSL", default="true", cast=bool)
|
|
|
|
def get_splunk_connection():
|
|
"""Helper function to establish Splunk connection"""
|
|
try:
|
|
logger.info(f"🔌 Attempting to connect to Splunk at {SPLUNK_SCHEME}://{SPLUNK_HOST}:{SPLUNK_PORT}")
|
|
logger.info(f"SSL Verification is {'enabled' if VERIFY_SSL else 'disabled'}")
|
|
|
|
# Test basic connectivity first
|
|
try:
|
|
sock = socket.create_connection((SPLUNK_HOST, SPLUNK_PORT), timeout=10)
|
|
sock.close()
|
|
logger.info("✅ Basic TCP connection test successful")
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to establish basic TCP connection: {str(e)}")
|
|
raise
|
|
|
|
# Configure SSL context with detailed logging
|
|
if not VERIFY_SSL:
|
|
logger.info("🔒 Creating custom SSL context with verification disabled")
|
|
ssl_context = ssl.create_default_context()
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_NONE
|
|
logger.info("✅ SSL context configured with verification disabled")
|
|
else:
|
|
logger.info("🔒 Using default SSL context with verification enabled")
|
|
ssl_context = None
|
|
|
|
# Attempt Splunk connection with detailed logging
|
|
try:
|
|
service = client.connect(
|
|
host=SPLUNK_HOST,
|
|
port=SPLUNK_PORT,
|
|
username=SPLUNK_USERNAME,
|
|
password=SPLUNK_PASSWORD,
|
|
scheme=SPLUNK_SCHEME,
|
|
ssl_context=ssl_context
|
|
)
|
|
logger.info("✅ Successfully established Splunk connection")
|
|
return service
|
|
except ssl.SSLError as e:
|
|
logger.error(f"❌ SSL Error during connection: {str(e)}")
|
|
logger.error(f"SSL Error details: {traceback.format_exc()}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"❌ Error during Splunk connection: {str(e)}")
|
|
logger.error(f"Error details: {traceback.format_exc()}")
|
|
raise
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to connect to Splunk: {str(e)}")
|
|
logger.error(f"Full error details: {traceback.format_exc()}")
|
|
raise
|
|
|
|
@mcp.tool()
|
|
async def search_splunk(
|
|
search_query: str,
|
|
earliest_time: Optional[str] = "-24h",
|
|
latest_time: Optional[str] = "now",
|
|
max_results: Optional[int] = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Execute a Splunk search query and return results
|
|
|
|
Args:
|
|
search_query: The Splunk search query to execute
|
|
earliest_time: Start time for the search (default: 24 hours ago)
|
|
latest_time: End time for the search (default: now)
|
|
max_results: Maximum number of results to return (default: 100)
|
|
|
|
Returns:
|
|
List of search results as dictionaries
|
|
"""
|
|
try:
|
|
service = get_splunk_connection()
|
|
logger.info(f"🔍 Executing Splunk search: {search_query}")
|
|
|
|
# Create the search job
|
|
kwargs_search = {
|
|
"earliest_time": earliest_time,
|
|
"latest_time": latest_time,
|
|
"preview": False,
|
|
"exec_mode": "blocking" # Make the search synchronous
|
|
}
|
|
|
|
# Remove 'search' if it's already in the query
|
|
if not search_query.lower().startswith('search '):
|
|
search_query = f"search {search_query}"
|
|
|
|
job = service.jobs.create(search_query, **kwargs_search)
|
|
|
|
# Get the results
|
|
results_list = []
|
|
|
|
# Get all results at once in JSON format
|
|
result_stream = job.results(output_mode='json', count=max_results)
|
|
|
|
# Parse the JSON response
|
|
import json
|
|
response_data = json.loads(result_stream.read().decode('utf-8'))
|
|
|
|
if 'results' in response_data:
|
|
results_list = response_data['results']
|
|
|
|
logger.info(f"✅ Search completed. Found {len(results_list)} results")
|
|
return results_list[:max_results]
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error executing Splunk search: {str(e)}")
|
|
raise
|
|
|
|
@mcp.tool()
|
|
async def list_indexes() -> List[Dict[str, Any]]:
|
|
"""
|
|
List all available Splunk indexes
|
|
|
|
Returns:
|
|
List of dictionaries containing index information
|
|
"""
|
|
try:
|
|
service = get_splunk_connection()
|
|
logger.info("📊 Fetching Splunk indexes...")
|
|
indexes = []
|
|
|
|
for index in service.indexes:
|
|
try:
|
|
index_info = {
|
|
"name": index.name,
|
|
"total_event_count": index.get("totalEventCount", "0"),
|
|
"current_size": index.get("currentDBSizeMB", "0"),
|
|
"max_size": index.get("maxTotalDataSizeMB", "0"),
|
|
"earliest_time": index.get("earliestTime", "0"),
|
|
"latest_time": index.get("latestTime", "0")
|
|
}
|
|
indexes.append(index_info)
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Error accessing metadata for index {index.name}: {str(e)}")
|
|
# Add basic information if metadata access fails
|
|
indexes.append({
|
|
"name": index.name,
|
|
"total_event_count": "0",
|
|
"current_size": "0",
|
|
"max_size": "0",
|
|
"earliest_time": "0",
|
|
"latest_time": "0"
|
|
})
|
|
|
|
logger.info(f"✅ Found {len(indexes)} indexes")
|
|
return indexes
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing indexes: {str(e)}")
|
|
raise
|
|
|
|
@mcp.tool()
|
|
async def list_users() -> List[Dict[str, Any]]: # Made async
|
|
"""
|
|
List all Splunk users
|
|
|
|
Returns:
|
|
List of dictionaries containing user information
|
|
"""
|
|
try:
|
|
service = get_splunk_connection()
|
|
logger.info("👥 Fetching Splunk users...")
|
|
users = []
|
|
|
|
for user in service.users:
|
|
user_info = {
|
|
"username": user.name,
|
|
"real_name": user["realname"],
|
|
"email": user["email"],
|
|
"roles": [role for role in user.role_entities()],
|
|
"default_app": user["defaultApp"],
|
|
"type": user["type"]
|
|
}
|
|
users.append(user_info)
|
|
|
|
logger.info(f"✅ Found {len(users)} users")
|
|
return users
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing users: {str(e)}")
|
|
raise
|
|
|
|
@mcp.tool()
|
|
async def list_kvstore_collections() -> List[Dict[str, Any]]: # Made async
|
|
"""
|
|
List all KV store collections
|
|
|
|
Returns:
|
|
List of dictionaries containing collection information
|
|
"""
|
|
try:
|
|
service = get_splunk_connection()
|
|
logger.info("📦 Fetching KV store collections...")
|
|
collections = []
|
|
|
|
for app in service.apps:
|
|
try:
|
|
kvstore = app.kvstore
|
|
for collection in kvstore:
|
|
collection_info = {
|
|
"name": collection.name,
|
|
"app": app.name,
|
|
"fields": collection.fields,
|
|
"accelerated_fields": collection.accelerated_fields,
|
|
"record_count": len(collection.data)
|
|
}
|
|
collections.append(collection_info)
|
|
except Exception as e:
|
|
logger.warning(f"⚠️ Error accessing KV store for app {app.name}: {str(e)}")
|
|
continue
|
|
|
|
logger.info(f"✅ Found {len(collections)} KV store collections")
|
|
return collections
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error listing KV store collections: {str(e)}")
|
|
raise
|
|
|
|
@mcp.tool()
|
|
async def create_kvstore_collection( # Made async
|
|
collection_name: str,
|
|
app_name: str,
|
|
fields: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new KV store collection
|
|
|
|
Args:
|
|
collection_name: Name of the collection to create
|
|
app_name: Name of the app to create the collection in
|
|
fields: Dictionary defining the collection schema
|
|
|
|
Returns:
|
|
Dictionary containing the created collection information
|
|
"""
|
|
try:
|
|
service = get_splunk_connection()
|
|
logger.info(f"📝 Creating KV store collection: {collection_name} in app: {app_name}")
|
|
app = service.apps[app_name]
|
|
|
|
# Create the collection
|
|
collection = app.kvstore.create(collection_name, fields)
|
|
|
|
result = {
|
|
"name": collection.name,
|
|
"app": app_name,
|
|
"fields": collection.fields,
|
|
"accelerated_fields": collection.accelerated_fields
|
|
}
|
|
|
|
logger.info("✅ KV store collection created successfully")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error creating KV store collection: {str(e)}")
|
|
raise
|
|
|
|
@mcp.tool()
|
|
async def delete_kvstore_collection( # Made async
|
|
collection_name: str,
|
|
app_name: str
|
|
) -> bool:
|
|
"""
|
|
Delete a KV store collection
|
|
|
|
Args:
|
|
collection_name: Name of the collection to delete
|
|
app_name: Name of the app containing the collection
|
|
|
|
Returns:
|
|
True if deletion was successful
|
|
"""
|
|
try:
|
|
service = get_splunk_connection()
|
|
logger.info(f"🗑️ Deleting KV store collection: {collection_name} from app: {app_name}")
|
|
app = service.apps[app_name]
|
|
|
|
# Delete the collection
|
|
app.kvstore[collection_name].delete()
|
|
logger.info("✅ KV store collection deleted successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error deleting KV store collection: {str(e)}")
|
|
raise
|
|
|
|
if __name__ == "__main__":
|
|
# Get transport mode from command line argument, default to stdio
|
|
transport_mode = "stdio"
|
|
if len(sys.argv) > 1 and sys.argv[1].lower() == "sse":
|
|
transport_mode = "sse"
|
|
logger.info("🚀 Starting Splunk MCP server in SSE mode")
|
|
else:
|
|
logger.info("🚀 Starting Splunk MCP server in STDIO mode")
|
|
|
|
mcp.run(transport=transport_mode) |