Initial commit

This commit is contained in:
2025-02-04 10:42:57 -06:00
commit 7b977abeb6
13 changed files with 1972 additions and 0 deletions

24
Dockerfile Normal file
View File

@@ -0,0 +1,24 @@
# Usar una imagen base de Python
FROM python:3.9-slim
# Instalar dependencias del sistema necesarias para OpenCV y zbar
RUN apt-get update && apt-get install -y \
libzbar0 \
libgl1-mesa-glx \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
# Directorio de trabajo
WORKDIR /app
# Copiar requirements.txt primero para aprovechar la caché de Docker
COPY requirements.txt .
# Instalar dependencias de Python
RUN pip install --no-cache-dir -r requirements.txt
# Copiar el código fuente
COPY . .
# Comando para ejecutar la aplicación
CMD ["python", "qr_detector.py"]

Binary file not shown.

Binary file not shown.

Binary file not shown.

50
config copy.yaml Normal file
View File

@@ -0,0 +1,50 @@
# webhooks:
enabled: true
endpoints:
- url: 'https://webhook.site/92f87e0c-3611-43b1-8010-b873835b9e75'
method: 'POST' # POST, PUT, GET
headers:
Authorization: 'Bearer your-token'
Content-Type: 'application/json'
retry:
max_attempts: 3
delay_seconds: 5
notification_rules:
send_on_new: true # Notificar nuevos QRs
send_on_repeat: false # Notificar QRs repetidos
headless:
enabled: true # Activar modo headless
save_images: true # Guardar imágenes de QRs detectados
image_output_dir: '/var/log/qr_detector/images' # Directorio para guardar imágenes
webhook_url: 'http://your-webhook-url' # URL para notificaciones (opcional)
# Configuración del detector QR
database:
enabled: true
path: 'qr_codes.db'
table_name: 'qr_scans'
camera:
id: 0 # ID de la cámara a usar
min_delay: 2.0 # Tiempo mínimo entre lecturas del mismo código
logging:
level: INFO
format: '%(asctime)s - %(levelname)s - %(message)s'
output:
console: true
file:
enabled: true
path: 'qr_detector.log'
display:
window_name: 'QR Detector'
rectangle:
color: [0, 255, 0] # Color BGR (verde)
thickness: 2
corner_length: 30 # Longitud de las líneas de las esquinas
text:
font_scale: 0.6
color: [255, 255, 255] # Color BGR (blanco)
thickness: 2
background_opacity: 0.5 # Opacidad del fondo del texto
margin_top: 40 # Píxeles sobre el código QR

55
config.yaml Normal file
View File

@@ -0,0 +1,55 @@
# webhooks:
# enabled: true
# endpoints:
# - url: 'https://webhook.site/92f87e0c-3611-43b1-8010-b873835b9e75'
# method: 'POST' # POST, PUT, GET
# headers:
# Authorization: 'Bearer your-token'
# Content-Type: 'application/json'
# retry:
# max_attempts: 3
# delay_seconds: 5
# notification_rules:
# send_on_new: true # Notificar nuevos QRs
# send_on_repeat: false # Notificar QRs repetidos
headless:
enabled: true # Activar modo headless
save_images: true # Guardar imágenes de QRs detectados
image_output_dir: '/var/log/qr_detector/images' # Directorio para guardar imágenes
webhook_url: 'http://your-webhook-url' # URL para notificaciones (opcional)
# Configuración del detector QR
database:
enabled: true
path: 'qr_codes.db'
table_name: 'qr_scans'
camera:
id: "rtsp://172.20.5.201:8080/h264.sdp" # ID de la cámara a usar
min_delay: 2.0 # Tiempo mínimo entre lecturas del mismo código
logging:
level: INFO
format: '%(asctime)s - %(levelname)s - %(message)s'
output:
console: true
file:
enabled: true
path: 'qr_detector.log'
display:
window_name: 'QR Detector'
rectangle:
color: [0, 255, 0] # Color BGR (verde)
thickness: 2
corner_length: 30 # Longitud de las líneas de las esquinas
text:
font_scale: 0.6
color: [255, 255, 255] # Color BGR (blanco)
thickness: 2
background_opacity: 0.5 # Opacidad del fondo del texto
margin_top: 40 # Píxeles sobre el código QR
# URL del webhook (opcional)
api:
url: "https://webhook.site/92f87e0c-3611-43b1-8010-b873835b9e75"
api_key: "tu-api-key-aqui"
batch_size: 1 # Número de QRs a acumular antes de enviar

86
db_handler.py Normal file
View File

@@ -0,0 +1,86 @@
import sqlite3
import logging
from contextlib import contextmanager
from datetime import datetime
class DBHandler:
def __init__(self, db_path='qr_codes.db'):
self.db_path = db_path
self.logger = logging.getLogger(__name__)
self.setup_database()
@contextmanager
def get_connection(self):
"""Administra la conexión a la base de datos"""
conn = None
try:
conn = sqlite3.connect(self.db_path)
yield conn
except sqlite3.Error as e:
self.logger.error(f"Error de conexión a la base de datos: {e}")
raise
finally:
if conn:
conn.close()
def setup_database(self):
"""Crea la tabla si no existe"""
create_table_sql = '''
CREATE TABLE IF NOT EXISTS qr_codes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
qr_data TEXT NOT NULL UNIQUE,
camera_name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
'''
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(create_table_sql)
conn.commit()
self.logger.info("Base de datos inicializada correctamente")
except sqlite3.Error as e:
self.logger.error(f"Error creando la tabla: {e}")
raise
def check_qr_exists(self, qr_data: str) -> bool:
"""Verifica si un código QR ya existe en la base de datos"""
query = 'SELECT COUNT(*) FROM qr_codes WHERE qr_data = ?'
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, (qr_data,))
(count,) = cursor.fetchone()
return count > 0
except sqlite3.Error as e:
self.logger.error(f"Error verificando QR: {e}")
return False
def save_qr(self, qr_data: str,camera_name: str) -> bool:
"""Guarda un nuevo código QR en la base de datos"""
if self.check_qr_exists(qr_data):
return False
insert_sql = 'INSERT INTO qr_codes (qr_data,camera_name) VALUES (?, ?)'
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(insert_sql, (qr_data,camera_name))
conn.commit()
self.logger.info(f"QR guardado en DB: {qr_data}")
return True
except sqlite3.Error as e:
self.logger.error(f"Error guardando QR: {e}")
return False
def get_all_qrs(self) -> list:
"""Obtiene todos los códigos QR almacenados"""
query = 'SELECT qr_data, created_at FROM qr_codes ORDER BY created_at DESC'
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
except sqlite3.Error as e:
self.logger.error(f"Error obteniendo QRs: {e}")
return []

BIN
qr_codes.db Normal file

Binary file not shown.

274
qr_detector copy.py Normal file
View File

@@ -0,0 +1,274 @@
# Agregar al inicio del archivo, después de los imports existentes
import cv2
from pyzbar import pyzbar
import os
import json
from datetime import datetime
import requests # Para webhooks si se necesitan
import time
import logging
import sys
import yaml
from pathlib import Path
from typing import Optional, Tuple, List
from dataclasses import dataclass
from contextlib import contextmanager
from db_handler import DatabaseHandler, DatabaseError
@dataclass
class QRData:
"""Clase para almacenar información del código QR"""
data: str
code_type: str
coordinates: Tuple[int, int, int, int]
timestamp: float
class QRDetectorError(Exception):
"""Clase base para excepciones del detector QR"""
pass
class CameraError(QRDetectorError):
"""Error relacionado con la cámara"""
pass
class QRDecodeError(QRDetectorError):
"""Error en la decodificación del QR"""
pass
class ConfigError(QRDetectorError):
"""Error en la configuración"""
pass
class QRDetector:
def __init__(self, config_path: str = 'config.yaml'):
self.config = self.load_config(config_path)
self.setup_logging()
self.camera_id = self.config['camera']['id']
self.min_delay = self.config['camera']['min_delay']
self.camera = None
self.last_code = None
self.last_time = 0
# Inicializar base de datos si está habilitada
if self.config['database']['enabled']:
try:
self.db = DatabaseHandler(self.config['database'])
self.logger.info("Base de datos inicializada correctamente")
except DatabaseError as e:
self.logger.error(f"Error inicializando base de datos: {e}")
raise
def load_config(self, config_path: str) -> dict:
"""Carga la configuración desde el archivo YAML"""
try:
with open(config_path, 'r') as file:
return yaml.safe_load(file)
except Exception as e:
raise ConfigError(f"Error cargando configuración: {e}")
def setup_logging(self):
"""Configura el sistema de logging basado en la configuración"""
log_config = self.config['logging']
handlers = []
if log_config['output']['console']:
handlers.append(logging.StreamHandler(sys.stdout))
if log_config['output']['file']['enabled']:
handlers.append(logging.FileHandler(log_config['output']['file']['path']))
logging.basicConfig(
level=getattr(logging, log_config['level']),
format=log_config['format'],
handlers=handlers
)
self.logger = logging.getLogger(__name__)
@contextmanager
def initialize_camera(self):
"""Inicializa la cámara de manera segura usando context manager"""
try:
self.camera = cv2.VideoCapture(self.camera_id)
if not self.camera.isOpened():
raise CameraError("No se pudo abrir la cámara")
yield self.camera
finally:
if self.camera:
self.camera.release()
cv2.destroyAllWindows()
self.logger.info("Recursos de la cámara liberados")
def process_frame(self, frame) -> List[QRData]:
"""Procesa un frame y retorna la lista de códigos QR detectados"""
try:
qr_codes = pyzbar.decode(frame)
current_time = time.time()
results = []
for qr_code in qr_codes:
try:
data = qr_code.data.decode('utf-8')
code_type = qr_code.type
coordinates = qr_code.rect
if (data != self.last_code or
(current_time - self.last_time) > self.min_delay):
qr_data = QRData(
data=data,
code_type=code_type,
coordinates=coordinates,
timestamp=current_time
)
results.append(qr_data)
self.last_code = data
self.last_time = current_time
# Guardar en base de datos si está habilitada
if self.config['database']['enabled']:
try:
qr_dict = {
'data': data,
'code_type': code_type,
'coordinates': coordinates,
'timestamp': current_time
}
self.db.save_qr_code(qr_dict)
scan_count = self.db.get_scan_count(data)
self.logger.info(f"QR guardado en DB. Total escaneos: {scan_count}")
except DatabaseError as e:
self.logger.error(f"Error guardando en base de datos: {e}")
except UnicodeDecodeError as e:
self.logger.error(f"Error decodificando QR: {e}")
raise QRDecodeError("Error al decodificar el contenido del QR")
return results
except Exception as e:
self.logger.error(f"Error procesando frame: {e}")
raise QRDetectorError(f"Error en el procesamiento del frame: {e}")
def draw_qr_info(self, frame, qr_data: QRData):
"""Dibuja la información del QR en el frame con efectos visuales mejorados"""
try:
display_config = self.config['display']
x, y, w, h = qr_data.coordinates
# Dibujar rectángulo principal
cv2.rectangle(
frame,
(x, y),
(x + w, y + h),
tuple(display_config['rectangle']['color']),
display_config['rectangle']['thickness']
)
# Dibujar esquinas del QR
l = 30 # longitud de las líneas de las esquinas
# Esquina superior izquierda
cv2.line(frame, (x, y), (x + l, y), (0, 255, 255), 2)
cv2.line(frame, (x, y), (x, y + l), (0, 255, 255), 2)
# Esquina superior derecha
cv2.line(frame, (x + w, y), (x + w - l, y), (0, 255, 255), 2)
cv2.line(frame, (x + w, y), (x + w, y + l), (0, 255, 255), 2)
# Esquina inferior izquierda
cv2.line(frame, (x, y + h), (x + l, y + h), (0, 255, 255), 2)
cv2.line(frame, (x, y + h), (x, y + h - l), (0, 255, 255), 2)
# Esquina inferior derecha
cv2.line(frame, (x + w, y + h), (x + w - l, y + h), (0, 255, 255), 2)
cv2.line(frame, (x + w, y + h), (x + w, y + h - l), (0, 255, 255), 2)
# Crear fondo semi-transparente para el texto
overlay = frame.copy()
cv2.rectangle(
overlay,
(x, y - 40),
(x + w, y),
(0, 0, 0),
-1
)
frame = cv2.addWeighted(overlay, 0.5, frame, 0.5, 0)
# Mostrar información
display_text = f"{qr_data.code_type}: {qr_data.data}"
cv2.putText(
frame,
display_text,
(x + 5, y - 15),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255), # Color blanco para mejor contraste
2
)
# Contador de escaneos si está disponible
if self.config['database']['enabled']:
try:
scan_count = self.db.get_scan_count(qr_data.data)
count_text = f"Escaneos: {scan_count}"
cv2.putText(
frame,
count_text,
(x + w - 100, y + h + 25),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 255),
1
)
except DatabaseError:
pass
self.logger.info(f"QR detectado - Tipo: {qr_data.code_type}, Datos: {qr_data.data}")
except Exception as e:
self.logger.error(f"Error dibujando información del QR: {e}")
raise QRDetectorError(f"Error al dibujar información: {e}")
def run(self):
"""Método principal para ejecutar el detector"""
self.logger.info("Iniciando detector de códigos QR...")
with self.initialize_camera() as camera:
while True:
try:
success, frame = camera.read()
if not success:
raise CameraError("Error capturando frame")
qr_codes = self.process_frame(frame)
for qr_data in qr_codes:
self.draw_qr_info(frame, qr_data)
cv2.imshow(self.config['display']['window_name'], frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.logger.info("Cerrando aplicación...")
break
except CameraError as e:
self.logger.error(f"Error de cámara: {e}")
break
except QRDecodeError as e:
self.logger.error(f"Error de decodificación: {e}")
continue
except Exception as e:
self.logger.error(f"Error inesperado: {e}")
break
def main():
try:
detector = QRDetector()
detector.run()
except ConfigError as e:
print(f"Error en la configuración: {e}")
sys.exit(1)
except Exception as e:
print(f"Error crítico en la aplicación: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

1085
qr_detector.log Normal file

File diff suppressed because it is too large Load Diff

267
qr_detector.py Normal file
View File

@@ -0,0 +1,267 @@
import os
import cv2
from pyzbar import pyzbar
import time
import logging
import sys
import yaml
import requests
from pathlib import Path
from typing import Optional, Tuple, List
from dataclasses import dataclass
from contextlib import contextmanager
import threading
from queue import Queue
from db_handler import DBHandler
@dataclass
class QRData:
"""Clase para almacenar información del código QR"""
data: str
code_type: str
coordinates: Tuple[int, int, int, int]
timestamp: float
class QRDetector:
def __init__(self, config_path: str = 'config.yaml'):
self.config = self.load_config(config_path)
self.setup_logging()
self.camera_id = self.config['camera']['id']
self.camera_name = self.config.get('camera', {}).get('name', 'NO_CAMERA_NAME')
self.min_delay = self.config['camera']['min_delay']
self.camera = None
self.last_code = None
self.last_time = 0
self.lotes = []
self.api_queue = Queue()
self.running = True
# Inicializar base de datos
self.db = DBHandler(self.config.get('database', {}).get('path', 'qr_codes.db'))
# Iniciar thread para procesar envíos a la API
self.api_thread = threading.Thread(target=self.api_worker)
self.api_thread.daemon = True
self.api_thread.start()
self.headless = self.config.get('headless', {}).get('enabled', False)
# Crear directorio para imágenes si es necesario
if self.headless and self.config['headless'].get('save_images', False):
os.makedirs(self.config['headless']['image_output_dir'], exist_ok=True)
# Verificar configuración de API
if 'api' in self.config:
self.logger.info(f"Configuración API cargada:")
self.logger.info(f"URL: {self.config['api']['url']}")
self.logger.info(f"Batch size: {self.config['api'].get('batch_size', 1)}")
else:
self.logger.error("No se encontró configuración de API")
def load_config(self, config_path: str) -> dict:
"""Carga la configuración desde el archivo YAML"""
try:
with open(config_path, 'r') as file:
return yaml.safe_load(file)
except Exception as e:
self.logger.error(f"Error cargando configuración: {e}")
sys.exit(1)
def setup_logging(self):
"""Configura el sistema de logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('qr_detector.log')
]
)
self.logger = logging.getLogger(__name__)
def api_worker(self):
"""Worker thread para procesar envíos a la API"""
while self.running:
try:
lotes = self.api_queue.get(timeout=1)
if lotes:
self._send_to_api(lotes)
self.api_queue.task_done()
except:
continue
def _send_to_api(self, lotes):
"""Envía los lotes a la API (método interno)"""
try:
api_url = self.config['api']['url']
headers = {
'Content-Type': 'application/json',
'x-api-key': self.config['api']['api_key']
}
payload = {
"lotes": lotes
}
self.logger.info(f"Enviando lotes a API: {lotes}")
response = requests.post(api_url, json=payload, headers=headers, timeout=5)
if response.status_code == 200:
self.logger.info(f"Lotes enviados exitosamente")
else:
self.logger.error(f"Error enviando lotes: {response.status_code}")
except Exception as e:
self.logger.error(f"Error en envío a API: {e}")
def send_to_api(self):
"""Encola los lotes para envío asíncrono"""
if not self.lotes:
return
# Copiar los lotes actuales y limpiar la lista
lotes_to_send = self.lotes.copy()
self.lotes = []
# Encolar para envío asíncrono
self.api_queue.put(lotes_to_send)
@contextmanager
def initialize_camera(self):
"""Inicializa la cámara"""
try:
self.camera = cv2.VideoCapture(self.camera_id)
if not self.camera.isOpened():
self.logger.error("No se pudo abrir la cámara")
sys.exit(1)
yield self.camera
finally:
if self.camera:
self.camera.release()
cv2.destroyAllWindows()
self.send_to_api() # Enviar lotes pendientes al cerrar
self.logger.info("Recursos liberados")
def process_frame(self, frame) -> List[QRData]:
"""Procesa un frame y retorna los códigos QR detectados"""
try:
qr_codes = pyzbar.decode(frame)
current_time = time.time()
results = []
for qr_code in qr_codes:
try:
data = qr_code.data.decode('utf-8')
code_type = qr_code.type
coordinates = qr_code.rect
if (data != self.last_code or
(current_time - self.last_time) > self.min_delay):
# Verificar si el QR ya existe en la base de datos local
if self.db.check_qr_exists(data):
self.logger.info(f"QR ya existe en la base de datos: {data}")
continue
# Guardar en la base de datos local
if self.db.save_qr(data,self.camera_name):
self.logger.info(f"QR guardado en base de datos: {data}")
qr_data = QRData(
data=data,
code_type=code_type,
coordinates=coordinates,
timestamp=current_time
)
results.append(qr_data)
# Añadir a lotes para envío a API solo si es nuevo
if data not in self.lotes:
self.lotes.append(data)
self.logger.info(f"Añadido a lotes para API. Total: {len(self.lotes)}")
self.last_code = data
self.last_time = current_time
except Exception as e:
self.logger.error(f"Error procesando QR individual: {e}")
return results
except Exception as e:
self.logger.error(f"Error procesando frame: {e}")
return []
def draw_qr_info(self, frame, qr_data: QRData):
"""Dibuja la información del QR en el frame"""
try:
x, y, w, h = qr_data.coordinates
# Dibujar rectángulo
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
# Mostrar información
display_text = f"{qr_data.code_type}: {qr_data.data}"
cv2.putText(
frame,
display_text,
(x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
2
)
except Exception as e:
self.logger.error(f"Error dibujando información: {e}")
def run(self):
"""Método principal para ejecutar el detector"""
self.logger.info("Iniciando detector de códigos QR...")
batch_size = self.config['api'].get('batch_size', 1)
self.logger.info(f"Batch size configurado: {batch_size}")
with self.initialize_camera() as camera:
while True:
try:
success, frame = camera.read()
if not success:
self.logger.error("Error capturando frame")
break
qr_codes = self.process_frame(frame)
for qr_data in qr_codes:
if self.headless:
self.save_frame_with_qr(frame, qr_data)
else:
self.draw_qr_info(frame, qr_data)
cv2.imshow('QR Detector', frame)
if not self.headless:
if cv2.waitKey(1) & 0xFF == ord('q'):
self.running = False
if self.lotes:
self.logger.info(f"Enviando {len(self.lotes)} lotes pendientes...")
self.send_to_api()
self.api_queue.join()
break
# Verificar y enviar lotes
if len(self.lotes) >= batch_size:
self.send_to_api()
except Exception as e:
self.logger.error(f"Error en el bucle principal: {e}")
continue
def main():
detector = QRDetector()
detector.run()
if __name__ == "__main__":
main()

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
opencv-python>=4.5.0
pyzbar>=0.1.8
numpy>=1.19.0
pyyaml>=5.4.0
requests>=2.32.3

126
webhook_handler.py Normal file
View File

@@ -0,0 +1,126 @@
import requests
import logging
import time
from typing import Dict, Any
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class WebhookConfig:
url: str
method: str
headers: Dict[str, str]
retry: Dict[str, int]
class WebhookError(Exception):
"""Clase para errores relacionados con webhooks"""
pass
class WebhookHandler:
def __init__(self, webhook_config: dict):
self.config = webhook_config
self.logger = logging.getLogger(__name__)
self.session = requests.Session()
def prepare_payload(self, qr_data: dict, scan_count: int) -> dict:
"""Prepara el payload para el webhook"""
return {
"event_type": "qr_detected",
"timestamp": datetime.now().isoformat(),
"qr_data": {
"content": qr_data.get('data'),
"type": qr_data.get('code_type'),
"scan_count": scan_count,
"coordinates": qr_data.get('coordinates'),
"detection_timestamp": qr_data.get('timestamp')
},
"device_info": {
"id": "qr_detector_01", # Puedes configurar esto
"location": "main_entrance" # Puedes configurar esto
}
}
def send_webhook(self, endpoint: WebhookConfig, payload: dict) -> bool:
"""Envía el webhook con reintentos"""
max_attempts = endpoint.retry['max_attempts']
delay = endpoint.retry['delay_seconds']
for attempt in range(max_attempts):
try:
response = self.session.request(
method=endpoint.method,
url=endpoint.url,
json=payload,
headers=endpoint.headers,
timeout=10
)
if response.ok:
self.logger.info(f"Webhook enviado exitosamente a {endpoint.url}")
return True
self.logger.warning(
f"Intento {attempt + 1}/{max_attempts} fallido. "
f"Status: {response.status_code}, "
f"Response: {response.text[:100]}"
)
except requests.RequestException as e:
self.logger.error(f"Error en intento {attempt + 1}/{max_attempts}: {e}")
if attempt < max_attempts - 1:
time.sleep(delay)
self.logger.error(f"Todos los intentos de webhook fallaron para {endpoint.url}")
return False
def notify(self, qr_data: dict, scan_count: int) -> None:
"""Maneja el envío de notificaciones basado en las reglas configuradas"""
try:
# Verificar reglas de notificación
is_new = scan_count == 1
should_notify = (
(is_new and self.config['notification_rules']['send_on_new']) or
(not is_new and self.config['notification_rules']['send_on_repeat'])
)
if not should_notify:
return
payload = self.prepare_payload(qr_data, scan_count)
for endpoint_config in self.config['endpoints']:
endpoint = WebhookConfig(
url=endpoint_config['url'],
method=endpoint_config['method'],
headers=endpoint_config['headers'],
retry=endpoint_config['retry']
)
self.send_webhook(endpoint, payload)
except Exception as e:
self.logger.error(f"Error procesando notificación: {e}")
raise WebhookError(f"Error en notificación: {e}")
def test_connection(self) -> bool:
"""Prueba las conexiones a todos los endpoints configurados"""
test_payload = {
"event_type": "connection_test",
"timestamp": datetime.now().isoformat()
}
all_successful = True
for endpoint_config in self.config['endpoints']:
endpoint = WebhookConfig(
url=endpoint_config['url'],
method=endpoint_config['method'],
headers=endpoint_config['headers'],
retry=endpoint_config['retry']
)
if not self.send_webhook(endpoint, test_payload):
all_successful = False
return all_successful