"""Sistema de metricas e telemetria para pypix-api.
Este modulo coleta e reporta metricas de uso da biblioteca para monitoramento
e otimizacao de performance.
"""
import json
import os
import threading
import time
from collections import Counter, defaultdict
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
[docs]
@dataclass
class MetricEntry:
"""Entrada de metrica individual."""
name: str
value: float
timestamp: datetime
tags: dict[str, str] = field(default_factory=dict)
unit: str = 'count'
[docs]
@dataclass
class APICallMetric:
"""Metrica para chamadas de API."""
method: str
endpoint: str
status_code: int
response_time: float
timestamp: datetime
bank: str
error: str | None = None
[docs]
class MetricsCollector:
"""Coletor central de metricas."""
_instance = None
_lock = threading.Lock()
[docs]
def __new__(cls):
"""Singleton pattern for metrics collector."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
[docs]
def __init__(self):
"""Initialize metrics collector."""
if hasattr(self, '_initialized'):
return
self._initialized = True
self.metrics: list[MetricEntry] = []
self.api_calls: list[APICallMetric] = []
self.counters: dict[str, int] = Counter()
self.gauges: dict[str, float] = {}
self.histograms: dict[str, list[float]] = defaultdict(list)
self.start_time = datetime.now()
self._lock = threading.Lock()
# Enable/disable metrics collection
self.enabled = os.getenv('PYPIX_METRICS_ENABLED', 'true').lower() == 'true'
# Auto-flush configuration
self.auto_flush_interval = int(
os.getenv('PYPIX_METRICS_FLUSH_INTERVAL', '300')
) # 5 minutes
self.max_metrics = int(os.getenv('PYPIX_METRICS_MAX_BUFFER', '1000'))
if self.enabled:
self._setup_auto_flush()
def _setup_auto_flush(self):
"""Setup automatic metrics flushing."""
def flush_periodically():
while self.enabled:
time.sleep(self.auto_flush_interval)
if len(self.metrics) > 0 or len(self.api_calls) > 0:
self.flush_metrics()
flush_thread = threading.Thread(target=flush_periodically, daemon=True)
flush_thread.start()
[docs]
def increment(
self, name: str, value: int = 1, tags: dict[str, str] | None = None
) -> None:
"""Increment a counter metric."""
if not self.enabled:
return
with self._lock:
key = f'{name}:{json.dumps(tags or {}, sort_keys=True)}'
self.counters[key] += value
self.metrics.append(
MetricEntry(
name=name,
value=value,
timestamp=datetime.now(),
tags=tags or {},
unit='count',
)
)
self._check_buffer_size()
[docs]
def gauge(
self, name: str, value: float, tags: dict[str, str] | None = None
) -> None:
"""Set a gauge metric value."""
if not self.enabled:
return
with self._lock:
key = f'{name}:{json.dumps(tags or {}, sort_keys=True)}'
self.gauges[key] = value
self.metrics.append(
MetricEntry(
name=name,
value=value,
timestamp=datetime.now(),
tags=tags or {},
unit='gauge',
)
)
self._check_buffer_size()
[docs]
def histogram(
self, name: str, value: float, tags: dict[str, str] | None = None
) -> None:
"""Record a histogram value."""
if not self.enabled:
return
with self._lock:
key = f'{name}:{json.dumps(tags or {}, sort_keys=True)}'
self.histograms[key].append(value)
self.metrics.append(
MetricEntry(
name=name,
value=value,
timestamp=datetime.now(),
tags=tags or {},
unit='histogram',
)
)
self._check_buffer_size()
[docs]
def timing(
self, name: str, duration: float, tags: dict[str, str] | None = None
) -> None:
"""Record timing metric in seconds."""
self.histogram(f'{name}.duration', duration, tags)
[docs]
def record_api_call(
self,
method: str,
endpoint: str,
status_code: int,
response_time: float,
bank: str,
error: str | None = None,
) -> None:
"""Record API call metrics."""
if not self.enabled:
return
with self._lock:
self.api_calls.append(
APICallMetric(
method=method,
endpoint=endpoint,
status_code=status_code,
response_time=response_time,
timestamp=datetime.now(),
bank=bank,
error=error,
)
)
# Also record as regular metrics
tags = {
'method': method,
'bank': bank,
'status_code': str(status_code),
'success': str(error is None),
}
self.increment('api_calls_total', tags=tags)
self.histogram('api_call_duration', response_time, tags=tags)
if error:
self.increment('api_errors_total', tags={**tags, 'error': error})
self._check_buffer_size()
def _check_buffer_size(self) -> None:
"""Check if buffer needs flushing."""
total_metrics = len(self.metrics) + len(self.api_calls)
if total_metrics >= self.max_metrics:
self.flush_metrics()
[docs]
def get_summary(self) -> dict[str, Any]:
"""Get metrics summary."""
with self._lock:
uptime = (datetime.now() - self.start_time).total_seconds()
# API call statistics
total_api_calls = len(self.api_calls)
successful_calls = sum(1 for call in self.api_calls if call.error is None)
response_times = [call.response_time for call in self.api_calls]
avg_response_time = (
sum(response_times) / len(response_times) if response_times else 0
)
# Counter totals
counter_summary = dict(self.counters.most_common(10))
return {
'uptime_seconds': uptime,
'total_metrics': len(self.metrics),
'total_api_calls': total_api_calls,
'successful_api_calls': successful_calls,
'error_rate': (total_api_calls - successful_calls) / total_api_calls
if total_api_calls > 0
else 0,
'average_response_time': avg_response_time,
'top_counters': counter_summary,
'memory_usage': self._estimate_memory_usage(),
'last_flush': getattr(self, '_last_flush', None),
}
def _estimate_memory_usage(self) -> dict[str, int]:
"""Estimate memory usage of metrics."""
import sys
return {
'metrics_bytes': sys.getsizeof(self.metrics),
'api_calls_bytes': sys.getsizeof(self.api_calls),
'counters_bytes': sys.getsizeof(self.counters),
'total_estimated': sys.getsizeof(self.metrics)
+ sys.getsizeof(self.api_calls)
+ sys.getsizeof(self.counters),
}
[docs]
def flush_metrics(self, export_path: str | None = None) -> bool:
"""Flush metrics to storage/export."""
if not self.enabled:
return False
with self._lock:
if not self.metrics and not self.api_calls:
return False
# Determine export path
if export_path is None:
export_path = os.getenv('PYPIX_METRICS_EXPORT_PATH')
if export_path:
success = self._export_to_file(export_path)
else:
success = self._export_to_console()
if success:
# Clear flushed metrics (keep last 100 for summary)
self.metrics = self.metrics[-100:]
self.api_calls = self.api_calls[-100:]
self._last_flush = datetime.now()
return success
def _export_to_file(self, file_path: str) -> bool:
"""Export metrics to JSON file."""
try:
export_data = {
'timestamp': datetime.now().isoformat(),
'summary': self.get_summary(),
'metrics': [
{
'name': m.name,
'value': m.value,
'timestamp': m.timestamp.isoformat(),
'tags': m.tags,
'unit': m.unit,
}
for m in self.metrics
],
'api_calls': [
{
'method': call.method,
'endpoint': call.endpoint,
'status_code': call.status_code,
'response_time': call.response_time,
'timestamp': call.timestamp.isoformat(),
'bank': call.bank,
'error': call.error,
}
for call in self.api_calls
],
}
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
# Append to file or create new
if Path(file_path).exists():
with open(file_path, 'a') as f:
f.write('\n' + json.dumps(export_data, ensure_ascii=False))
else:
with open(file_path, 'w') as f:
json.dump(export_data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f'Failed to export metrics to file: {e}')
return False
def _export_to_console(self) -> bool:
"""Export metrics summary to console."""
try:
summary = self.get_summary()
print('\n=== pypix-api Metrics Summary ===')
print(f'Uptime: {summary["uptime_seconds"]:.1f}s')
print(
f'API Calls: {summary["total_api_calls"]} (Success: {summary["successful_api_calls"]})'
)
print(f'Error Rate: {summary["error_rate"]:.2%}')
print(f'Avg Response Time: {summary["average_response_time"]:.3f}s')
print('================================\n')
return True
except Exception:
return False
[docs]
def clear_metrics(self) -> None:
"""Clear all collected metrics."""
with self._lock:
self.metrics.clear()
self.api_calls.clear()
self.counters.clear()
self.gauges.clear()
self.histograms.clear()
[docs]
def timed_function(metric_name: str | None = None, tags: dict[str, str] | None = None):
"""Decorator to automatically time function execution."""
def decorator(func: Callable) -> Callable:
def wrapper(*args: Any, **kwargs: Any):
start_time = time.time()
name = metric_name or f'function.{func.__name__}'
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
metrics = MetricsCollector()
metrics.timing(name, duration, tags)
metrics.increment(
f'{name}.calls', tags={**(tags or {}), 'success': 'true'}
)
return result
except Exception as e:
duration = time.time() - start_time
metrics = MetricsCollector()
metrics.timing(name, duration, tags)
metrics.increment(
f'{name}.calls',
tags={
**(tags or {}),
'success': 'false',
'error': type(e).__name__,
},
)
metrics.increment(
f'{name}.errors', tags={**(tags or {}), 'error': type(e).__name__}
)
raise
return wrapper
return decorator
# Convenience functions for common metrics
[docs]
def track_bank_operation(bank: str, operation: str) -> PerformanceTracker:
"""Track bank operation performance."""
return PerformanceTracker(
'bank_operation', tags={'bank': bank, 'operation': operation}
)
[docs]
def track_api_call(method: str, endpoint: str) -> PerformanceTracker:
"""Track API call performance."""
return PerformanceTracker('api_call', tags={'method': method, 'endpoint': endpoint})
[docs]
def get_metrics_summary() -> dict[str, Any]:
"""Get current metrics summary."""
return MetricsCollector().get_summary()
[docs]
def export_metrics(file_path: str | None = None) -> bool:
"""Export metrics to file or console."""
return MetricsCollector().flush_metrics(file_path)
[docs]
def clear_metrics() -> None:
"""Clear all metrics."""
MetricsCollector().clear_metrics()
# Export main classes and functions
__all__ = [
'APICallMetric',
'MetricEntry',
'MetricsCollector',
'PerformanceTracker',
'clear_metrics',
'export_metrics',
'get_metrics_summary',
'timed_function',
'track_api_call',
'track_bank_operation',
]