Source code for pypix_api.auth.oauth2

import os
import time
from typing import Any, BinaryIO

import requests
from dotenv import load_dotenv

from pypix_api.auth.mtls import get_session_with_mtls


[docs] class OAuth2Client: """Cliente OAuth2 para autenticação com a API"""
[docs] def __init__( self, token_url: str, client_id: str | None = None, cert: str | None = None, pvk: str | None = None, cert_pfx: str | bytes | BinaryIO | None = None, pwd_pfx: str | None = None, sandbox_mode: bool = False, ) -> None: """Inicializa o cliente OAuth2 Args: token_url: URL de autenticação OAuth2 client_id: Client ID para autenticação OAuth2 cert: Path para o certificado PEM (opcional) pvk: Path para a chave privada PEM (opcional) cert_pfx: Path ou dados do certificado PFX (opcional) pwd_pfx: Senha do certificado PFX (opcional) sandbox_mode: Se True, não requer certificado (default: False) """ load_dotenv() self.client_id: str | None = client_id or os.getenv('CLIENT_ID') self.cert: str | None = cert or os.getenv('CERT') self.pvk: str | None = pvk or os.getenv('PVK') self.cert_pfx: str | bytes | BinaryIO | None = cert_pfx or os.getenv('CERT_PFX') self.pwd_pfx: str | None = pwd_pfx or os.getenv('PWD_PFX') self.token_url: str = token_url # URL de autenticação OAuth2 self.token_cache: dict[str, dict[str, Any]] = {} # Cache de tokens por escopo self.session: requests.Session = requests.Session() self.sandbox_mode = sandbox_mode if not self.sandbox_mode: self.session = get_session_with_mtls( cert=self.cert, pvk=self.pvk, cert_pfx=self.cert_pfx, pwd_pfx=self.pwd_pfx, sandbox_mode=self.sandbox_mode, )
[docs] def get_token(self, scope: str | None = None) -> str: """Obtém ou renova o token de acesso para o escopo especificado Args: scope: Escopo(s) necessário(s) para a API. Exemplos: - Cobrança por Boleto: "boletos_inclusao boletos_consulta boletos_alteracao webhooks_alteracao webhooks_consulta webhooks_inclusao" - Conta Corrente: "cco_consulta cco_transferencias openid" - Recebimento no PIX: "cob.write cob.read cobv.write cobv.read lotecobv.write lotecobv.read pix.write pix.read webhook.read webhook.write payloadlocation.write payloadlocation.read" Returns: str: Token de acesso válido para o escopo solicitado """ # Usa escopo padrão se não especificado (para compatibilidade) if scope is None: scope = 'cco_extrato cco_consulta' # Verifica se já existe token válido para este escopo if scope in self.token_cache and not self._is_token_expired(scope): return self.token_cache[scope]['access_token'] token_data = { 'grant_type': 'client_credentials', 'client_id': self.client_id, 'scope': scope, } response = self.session.post(self.token_url, data=token_data) response.raise_for_status() token_info = response.json() token_info['expires_at'] = time.time() + token_info['expires_in'] # Armazena o token no cache por escopo self.token_cache[scope] = token_info return token_info['access_token']
def _is_token_expired(self, scope: str) -> bool: """Verifica se o token para o escopo especificado expirou""" if scope not in self.token_cache or 'expires_at' not in self.token_cache[scope]: return True return ( time.time() >= self.token_cache[scope]['expires_at'] - 60 ) # 60s de margem