Source code for fastapi_payments.services.payment_service

from typing import Dict, Any, Optional, List
import logging
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession

from ..config.config_schema import PaymentConfig
from ..providers import get_provider
from ..messaging.publishers import PaymentEventPublisher, PaymentEvents
from ..db.repositories import (
    BaseRepository,
    CustomerRepository,
    PaymentRepository,
        PaymentMethodRepository,
    SubscriptionRepository,
    ProductRepository,
    PlanRepository,
)

logger = logging.getLogger(__name__)


[docs] class PaymentService: """Service for payment operations."""
[docs] def __init__( self, config: PaymentConfig, event_publisher: PaymentEventPublisher, db_session=None, ): """ Initialize the payment service. Args: config: Payment configuration event_publisher: Event publisher for notifications db_session: Database session """ self.config = config self.default_provider = config.default_provider self.event_publisher = event_publisher self.db_session = db_session # Initialize provider instances self.providers = {} for provider_name, provider_config in config.providers.items(): self.providers[provider_name] = get_provider( provider_name, provider_config) # Initialize repositories if session is provided if db_session: self.set_db_session(db_session)
[docs] def set_db_session(self, session: AsyncSession): """ Set the database session. Args: session: SQLAlchemy AsyncSession """ self.db_session = session self.customer_repo = CustomerRepository(session) self.payment_repo = PaymentRepository(session) self.subscription_repo = SubscriptionRepository(session) self.product_repo = ProductRepository(session) self.plan_repo = PlanRepository(session) self.payment_method_repo = PaymentMethodRepository(session) # Sync job repository from ..db.repositories.sync_job_repository import SyncJobRepository self.sync_job_repo = SyncJobRepository(session)
[docs] def get_provider(self, provider_name: Optional[str] = None) -> Any: """ Get a payment provider instance. Args: provider_name: Name of the provider to get, or None for default Returns: Provider instance """ provider_name = provider_name or self.default_provider provider = self.providers.get(provider_name) if not provider: raise ValueError(f"Provider {provider_name} not found") return provider
[docs] async def create_customer( self, email: str, name: Optional[str] = None, meta_info: Optional[Dict[str, Any]] = None, provider: Optional[str] = None, address: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Create a customer. Args: email: Customer email name: Customer name meta_info: Additional customer meta_info provider: Name of the provider to use (default is default_provider) Returns: Customer data dictionary """ provider_instance = self.get_provider(provider) customer_data = await provider_instance.create_customer( email=email, name=name, meta_info=meta_info, address=address ) # Save to database if session available if hasattr(self, "customer_repo"): # Persist address into the dedicated column. The repository will # detect and migrate any address embedded in meta_info if needed. customer = await self.customer_repo.create( email=email, name=name, meta_info=meta_info, address=address ) # Link the provider's customer ID to our customer await self.customer_repo.add_provider_customer( customer_id=customer.id, provider=provider or self.default_provider, provider_customer_id=customer_data["provider_customer_id"], ) # Return standardized customer data (include top-level address) return { "id": customer.id, "email": customer.email, "name": customer.name, "created_at": customer.created_at.isoformat(), "provider_customer_id": customer_data["provider_customer_id"], "meta_info": customer.meta_info, "address": customer.address, } else: # No database session, return the provider's response directly return { "id": f"cust_{hash(email) % 10000:04d}", # Generate a fake ID **customer_data, }
[docs] async def ensure_provider_customer(self, customer_id: str, provider: str) -> Dict[str, Any]: """Ensure the customer is registered with the requested provider.""" if not self.db_session: raise RuntimeError("Database session not set") customer_repo = CustomerRepository(self.db_session) customer = await customer_repo.get_with_provider_customers(customer_id) if not customer: raise ValueError(f"Customer not found: {customer_id}") existing = next( (pc for pc in customer.provider_customers if pc.provider == provider), None, ) if existing: return { "provider": existing.provider, "provider_customer_id": existing.provider_customer_id, } provider_instance = self.get_provider(provider) provider_payload = await provider_instance.create_customer( email=customer.email, name=customer.name, meta_info=customer.meta_info, address=customer.address, ) provider_customer_id = provider_payload.get("provider_customer_id") or provider_payload.get("id") if not provider_customer_id: raise ValueError("Provider did not return a customer identifier") await customer_repo.add_provider_customer( customer_id=customer_id, provider=provider, provider_customer_id=provider_customer_id, ) return {"provider": provider, "provider_customer_id": provider_customer_id}
[docs] async def get_customer(self, customer_id: str) -> Optional[Dict[str, Any]]: """ Get customer details. Args: customer_id: Customer ID Returns: Customer data if found, None otherwise """ if not self.db_session: raise RuntimeError("Database session not set") customer_repo = CustomerRepository(self.db_session) customer = await customer_repo.get_with_provider_customers(customer_id) if not customer: return None # Get provider-specific data for each provider provider_data = {} for provider_customer in customer.provider_customers: provider_instance = self.get_provider(provider_customer.provider) try: provider_data[provider_customer.provider] = ( await provider_instance.retrieve_customer( provider_customer.provider_customer_id ) ) except Exception as e: logger.error(f"Error retrieving provider customer: {str(e)}") provider_data[provider_customer.provider] = {"error": str(e)} # Return combined data return { "id": customer.id, "email": customer.email, "name": customer.name, "meta_info": customer.meta_info, "address": customer.address, "created_at": customer.created_at.isoformat(), "updated_at": customer.updated_at.isoformat(), "provider_customers": [ { "provider": pc.provider, "provider_customer_id": pc.provider_customer_id, "provider_data": provider_data.get(pc.provider, {}), } for pc in customer.provider_customers ], }
[docs] async def list_customers( self, *, limit: int = 50, offset: int = 0, search: Optional[str] = None, ) -> List[Dict[str, Any]]: """List customers stored in the service database.""" if not self.db_session: raise RuntimeError("Database session not set") customer_repo = CustomerRepository(self.db_session) customers = await customer_repo.list( limit=limit, offset=offset, search=search, include_provider_customers=True, ) results: List[Dict[str, Any]] = [] for customer in customers: default_provider_link = next( ( pc for pc in customer.provider_customers if pc.provider == self.default_provider ), None, ) results.append( { "id": customer.id, "email": customer.email, "name": customer.name, "meta_info": customer.meta_info, "address": customer.address, "created_at": customer.created_at.isoformat(), "updated_at": customer.updated_at.isoformat() if customer.updated_at else None, "provider_customer_id": ( default_provider_link.provider_customer_id if default_provider_link else None ), "provider_customers": [ { "provider": pc.provider, "provider_customer_id": pc.provider_customer_id, } for pc in customer.provider_customers ], } ) return results
[docs] async def update_customer(self, customer_id: str, **kwargs) -> Dict[str, Any]: """ Update customer details. Args: customer_id: Customer ID **kwargs: Fields to update Returns: Updated customer data """ if not self.db_session: raise RuntimeError("Database session not set") customer_repo = CustomerRepository(self.db_session) customer = await customer_repo.get_with_provider_customers(customer_id) if not customer: raise ValueError(f"Customer not found: {customer_id}") # Update customer in database update_fields = {} # If address provided, store into dedicated address column if "address" in kwargs and kwargs["address"] is not None: update_fields["address"] = kwargs["address"] for field in ["email", "name", "meta_info"]: if field in kwargs and field not in update_fields: update_fields[field] = kwargs[field] if update_fields: logger.debug(f"Updating customer {customer_id} with fields: {update_fields}") customer = await customer_repo.update(customer_id, **update_fields) # Update customer in providers for provider_customer in customer.provider_customers: provider_instance = self.get_provider(provider_customer.provider) try: # When updating providers, pass through any top-level fields. # If address is present, include it in the data payload as 'address'. provider_payload = dict(kwargs) await provider_instance.update_customer( provider_customer.provider_customer_id, provider_payload ) except Exception as e: logger.error(f"Error updating provider customer: {str(e)}") # Publish event await self.event_publisher.publish_event( PaymentEvents.CUSTOMER_UPDATED, {"customer_id": customer.id, "updates": update_fields}, ) # Return updated customer return await self.get_customer(customer_id)
[docs] async def create_payment_method( self, customer_id: str, payment_details: Dict[str, Any], provider: Optional[str] = None, ) -> Dict[str, Any]: """ Create a payment method for a customer. Args: customer_id: Customer ID payment_details: Payment method details provider: Optional provider name Returns: Created payment method data """ if not self.db_session: raise RuntimeError("Database session not set") provider_name = provider or self.default_provider customer_repo = CustomerRepository(self.db_session) # Get provider customer provider_customer = await customer_repo.get_provider_customer( customer_id, provider_name ) if not provider_customer: raise ValueError( f"Customer not found for provider {provider_name}") # Create payment method in provider provider_instance = self.get_provider(provider_name) payment_method = await provider_instance.create_payment_method( provider_customer.provider_customer_id, payment_details ) # Persist payment method server-side if we have a session/repo if hasattr(self, "payment_method_repo"): # Normalize card metadata if present card = payment_method.get("card") or {} pm = await self.payment_method_repo.create( customer_id=customer_id, provider=provider_name, provider_payment_method_id=payment_method["payment_method_id"], mandate_id=payment_method.get("mandate_id"), is_default=payment_method.get("is_default", False), card_brand=card.get("brand"), card_last4=card.get("last4"), card_exp_month=card.get("exp_month"), card_exp_year=card.get("exp_year"), meta_info=payment_method.get("meta_info", {}), ) # Replace returned payload with DB-backed values payment_method["stored_id"] = pm.id payment_method["is_default"] = pm.is_default # Ensure mandate_id (if provider returned one) is present if pm.mandate_id: payment_method["mandate_id"] = pm.mandate_id # Publish event await self.event_publisher.publish_event( PaymentEvents.PAYMENT_METHOD_CREATED, { "customer_id": customer_id, "provider": provider_name, "payment_method_id": payment_method["payment_method_id"], }, ) # Return payment method data (persisted if we have DB storage) result = { "id": payment_method.get("payment_method_id") or payment_method.get("stored_id"), "provider": provider_name, "type": payment_method.get("type"), "is_default": payment_method.get("is_default", False), "card": payment_method.get("card"), "created_at": datetime.utcnow().isoformat(), } # Include mandate_id if the provider returned it (e.g., from a SetupIntent) if payment_method.get("mandate_id"): result["mandate_id"] = payment_method.get("mandate_id") return result
[docs] async def create_setup_intent( self, customer_id: str, provider: Optional[str] = None, usage: Optional[str] = None ) -> Dict[str, Any]: """ Create a SetupIntent (or provider equivalent) for the given customer so a client-side flow (3DS) can be completed before saving a payment method. Returns a dict that includes `id` and `client_secret` keys when supported by the provider. """ if not self.db_session: raise RuntimeError("Database session not set") provider_name = provider or self.default_provider customer_repo = CustomerRepository(self.db_session) provider_customer = await customer_repo.get_provider_customer( customer_id, provider_name ) if not provider_customer: raise ValueError( f"Customer not found for provider {provider_name}" ) provider_instance = self.get_provider(provider_name) result = await provider_instance.create_setup_intent( provider_customer.provider_customer_id, usage=usage ) return result
[docs] async def list_payment_methods( self, customer_id: str, provider: Optional[str] = None ) -> List[Dict[str, Any]]: """ List payment methods for a customer. Args: customer_id: Customer ID provider: Optional provider name Returns: List of payment methods """ if not self.db_session: raise RuntimeError("Database session not set") provider_name = provider or self.default_provider customer_repo = CustomerRepository(self.db_session) # Get provider customer provider_customer = await customer_repo.get_provider_customer( customer_id, provider_name ) if not provider_customer: raise ValueError( f"Customer not found for provider {provider_name}") # Prefer returning server-side stored payment methods when available if hasattr(self, "payment_method_repo"): stored_methods = await self.payment_method_repo.list_for_customer( customer_id, provider=provider_name ) return [ { "id": m.provider_payment_method_id, "provider": m.provider, "type": "card" if m.card_brand else "unknown", "is_default": bool(m.is_default), "card": { "brand": m.card_brand, "last4": m.card_last4, "exp_month": m.card_exp_month, "exp_year": m.card_exp_year, } if m.card_brand else None, "mandate_id": getattr(m, "mandate_id", None), } for m in stored_methods ] # Fallback: query provider directly when server storage isn't available provider_instance = self.get_provider(provider_name) payment_methods = await provider_instance.list_payment_methods( provider_customer.provider_customer_id ) # Return formatted payment methods return [ { "id": pm["payment_method_id"], "provider": provider_name, "type": pm.get("type"), "is_default": pm.get("is_default", False), "card": pm.get("card"), "mandate_id": pm.get("mandate_id"), } for pm in payment_methods ]
[docs] async def update_payment_method( self, customer_id: str, method_id: str, provider: Optional[str] = None, **fields: Any, ) -> Dict[str, Any]: """Update a stored payment method's attributes. Supported updates include setting is_default, or meta_info. """ if not self.db_session: raise RuntimeError("Database session not set") pm_repo = self.payment_method_repo pm = await pm_repo.get_by_id(method_id) # If caller passed a provider_payment_method_id (not the DB id), try lookup if not pm: pm = await pm_repo.get_by_provider_method_id(provider, method_id) if not pm or pm.customer_id != customer_id: raise ValueError("Payment method not found for customer") # Handle set_default specially: update provider and DB if fields.get("is_default"): # Use repository helper to mark default await pm_repo.set_default(customer_id, method_id) # If provider supports setting default at customer-level, attempt it try: provider_instance = self.get_provider(provider or pm.provider) # update the provider customer's invoice_settings.default_payment_method # Find provider_customer link customer_repo = CustomerRepository(self.db_session) provider_customer = await customer_repo.get_provider_customer( customer_id, provider or pm.provider ) if provider_customer: await provider_instance.update_customer( provider_customer.provider_customer_id, {"invoice_settings": {"default_payment_method": pm.provider_payment_method_id}}, ) except Exception: # Provider may not support or we don't want failures to block; log and continue logger.exception("Failed to set provider-side default payment method") # Update any DB-stored meta or card fields update_fields = {k: v for k, v in fields.items() if k != "is_default"} if update_fields: updated = await pm_repo.update(method_id, **update_fields) pm = updated return { "id": pm.provider_payment_method_id, "provider": pm.provider, "is_default": bool(pm.is_default), "card": { "brand": pm.card_brand, "last4": pm.card_last4, "exp_month": pm.card_exp_month, "exp_year": pm.card_exp_year, } if pm.card_brand else None, "mandate_id": getattr(pm, "mandate_id", None), }
[docs] async def delete_payment_method( self, customer_id: str, method_id: str, provider: Optional[str] = None ) -> Dict[str, Any]: """Delete (detach) a stored payment method both in provider (if supported) and DB.""" if not self.db_session: raise RuntimeError("Database session not set") pm_repo = self.payment_method_repo pm = await pm_repo.get_by_id(method_id) if not pm: pm = await pm_repo.get_by_provider_method_id(provider, method_id) if not pm or pm.customer_id != customer_id: raise ValueError("Payment method not found for customer") provider_name = provider or pm.provider # Try to delete/detach in provider try: provider_instance = self.get_provider(provider_name) if hasattr(provider_instance, "delete_payment_method"): await provider_instance.delete_payment_method(pm.provider_payment_method_id) except Exception: logger.exception("Error deleting payment method at provider; continuing to remove DB record") # Remove from DB deleted = await pm_repo.delete(method_id) # Publish event await self.event_publisher.publish_event( PaymentEvents.PAYMENT_METHOD_DELETED, {"customer_id": customer_id, "provider": provider_name, "payment_method_id": method_id}, ) return {"deleted": bool(deleted), "id": method_id}
[docs] async def set_default_payment_method( self, customer_id: str, method_id: str, provider: Optional[str] = None ) -> Dict[str, Any]: """Mark a stored payment method as the default for the customer. Updates both DB (via repository.set_default) and attempts to update the provider's customer default payment settings where supported. """ if not self.db_session: raise RuntimeError("Database session not set") pm_repo = self.payment_method_repo pm = await pm_repo.get_by_id(method_id) if not pm: pm = await pm_repo.get_by_provider_method_id(provider, method_id) if not pm or pm.customer_id != customer_id: raise ValueError("Payment method not found for customer") # Set DB default updated = await pm_repo.set_default(customer_id, method_id) # Set default on provider (customer invoice settings) if possible try: provider_instance = self.get_provider(provider or pm.provider) customer_repo = CustomerRepository(self.db_session) provider_customer = await customer_repo.get_provider_customer( customer_id, provider or pm.provider ) if provider_customer: await provider_instance.update_customer( provider_customer.provider_customer_id, {"invoice_settings": {"default_payment_method": pm.provider_payment_method_id}}, ) except Exception: logger.exception("Failed to set provider-side default payment method") return { "id": updated.provider_payment_method_id, "is_default": bool(updated.is_default), "provider": updated.provider, }
[docs] async def create_product( self, name: str, description: Optional[str] = None, provider: Optional[str] = None, meta_info: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Create a new product. Args: name: Product name description: Optional product description provider: Optional provider name meta_info: Optional product meta_info Returns: Created product data """ if not self.db_session: raise RuntimeError("Database session not set") provider_name = provider or self.default_provider provider_instance = self.get_provider(provider_name) # Create product in provider provider_product = await provider_instance.create_product( name=name, description=description, meta_info=meta_info ) # Create product in database product_repo = ProductRepository(self.db_session) product = await product_repo.create( name=name, description=description, meta_info={ **(meta_info or {}), "provider_product_id": provider_product["provider_product_id"], "provider": provider_name, }, ) # Return combined data return { "id": product.id, "name": product.name, "description": product.description, "active": product.active, "provider_product_id": provider_product["provider_product_id"], "provider": provider_name, "created_at": product.created_at.isoformat(), }
[docs] async def list_products( self, *, limit: int = 50, offset: int = 0, ) -> List[Dict[str, Any]]: """Return products stored in the local database.""" if not self.db_session: raise RuntimeError("Database session not set") products = await self.product_repo.list(limit=limit, offset=offset) results: List[Dict[str, Any]] = [] for product in products: meta_info = product.meta_info or {} results.append( { "id": product.id, "name": product.name, "description": product.description, "active": product.active, "provider_product_id": meta_info.get("provider_product_id", ""), "provider": meta_info.get("provider", self.default_provider), "created_at": product.created_at.isoformat(), "meta_info": meta_info or None, } ) return results
[docs] async def create_plan( self, product_id: str, name: str, pricing_model: str, amount: float, description: Optional[str] = None, currency: str = "USD", billing_interval: Optional[str] = None, billing_interval_count: Optional[int] = None, trial_period_days: Optional[int] = None, provider: Optional[str] = None, meta_info: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Create a plan for a product. Args: product_id: Product ID name: Plan name pricing_model: Pricing model (subscription, usage_based, etc.) amount: Base amount description: Optional plan description currency: Currency code billing_interval: Billing interval (day, week, month, year) billing_interval_count: Number of intervals between billings trial_period_days: Optional trial period in days provider: Optional provider name meta_info: Optional plan meta_info Returns: Created plan data """ if not self.db_session: raise RuntimeError("Database session not set") provider_name = provider or self.default_provider provider_instance = self.get_provider(provider_name) # Get product product_repo = ProductRepository(self.db_session) product = await product_repo.get_by_id(product_id) if not product: raise ValueError(f"Product not found: {product_id}") # Get provider product ID from meta_info provider_product_id = product.meta_info.get("provider_product_id") if not provider_product_id: raise ValueError( f"Provider product ID not found for product {product_id}") # Create price in provider provider_price = await provider_instance.create_price( product_id=provider_product_id, amount=amount, currency=currency, interval=billing_interval, interval_count=billing_interval_count, meta_info={ **(meta_info or {}), "name": name, "pricing_model": pricing_model, }, ) # Create plan in database plan_repo = PlanRepository(self.db_session) plan = await plan_repo.create( product_id=product_id, name=name, description=description, pricing_model=pricing_model, amount=amount, currency=currency, billing_interval=billing_interval, billing_interval_count=billing_interval_count, trial_period_days=trial_period_days, is_active=True, meta_info={ **(meta_info or {}), "provider": provider_name, "provider_price_id": provider_price["provider_price_id"], }, ) # Return combined data return { "id": plan.id, "product_id": plan.product_id, "name": plan.name, "description": plan.description, "pricing_model": plan.pricing_model, "amount": plan.amount, "currency": plan.currency, "billing_interval": plan.billing_interval, "billing_interval_count": plan.billing_interval_count, "trial_period_days": plan.trial_period_days, "provider": provider_name, "provider_price_id": provider_price["provider_price_id"], "created_at": plan.created_at.isoformat(), }
[docs] async def list_plans( self, *, product_id: Optional[str] = None, limit: int = 50, offset: int = 0, ) -> List[Dict[str, Any]]: """Return plans, optionally filtered by product.""" if not self.db_session: raise RuntimeError("Database session not set") plans = await self.plan_repo.list( product_id=product_id, limit=limit, offset=offset ) results: List[Dict[str, Any]] = [] for plan in plans: meta_info = plan.meta_info or {} pricing_model = ( plan.pricing_model.value if hasattr(plan.pricing_model, "value") else plan.pricing_model ) results.append( { "id": plan.id, "product_id": plan.product_id, "name": plan.name, "description": plan.description, "pricing_model": pricing_model, "amount": plan.amount, "currency": plan.currency, "billing_interval": plan.billing_interval, "billing_interval_count": plan.billing_interval_count, "trial_period_days": plan.trial_period_days, "provider": meta_info.get("provider", self.default_provider), "provider_price_id": meta_info.get("provider_price_id", ""), "created_at": plan.created_at.isoformat(), "meta_info": meta_info or None, } ) return results
[docs] async def create_subscription( self, customer_id: str, plan_id: str, quantity: int = 1, trial_period_days: Optional[int] = None, meta_info: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Create a subscription for a customer. Args: customer_id: Customer ID plan_id: Plan ID quantity: Number of units/seats trial_period_days: Optional trial period in days meta_info: Optional subscription meta_info Returns: Created subscription data """ if not self.db_session: raise RuntimeError("Database session not set") # Get plan and customer plan_repo = PlanRepository(self.db_session) customer_repo = CustomerRepository(self.db_session) plan = await plan_repo.get_by_id(plan_id) customer = await customer_repo.get_with_provider_customers(customer_id) if not plan: raise ValueError(f"Plan not found: {plan_id}") if not customer: raise ValueError(f"Customer not found: {customer_id}") # Get provider from plan meta_info provider_name = plan.meta_info.get("provider") provider_price_id = plan.meta_info.get("provider_price_id") if not provider_name or not provider_price_id: raise ValueError(f"Provider info not found for plan {plan_id}") # Get provider customer provider_customer = await customer_repo.get_provider_customer( customer_id, provider_name ) if not provider_customer: raise ValueError( f"Customer not found for provider {provider_name}") # Create subscription in provider provider_instance = self.get_provider(provider_name) provider_subscription = await provider_instance.create_subscription( provider_customer_id=provider_customer.provider_customer_id, price_id=provider_price_id, quantity=quantity, trial_period_days=trial_period_days or plan.trial_period_days, meta_info=meta_info, ) # Create subscription in database subscription_repo = SubscriptionRepository(self.db_session) current_period_start = ( datetime.fromisoformat( provider_subscription["current_period_start"].replace( "Z", "+00:00") ) if isinstance(provider_subscription["current_period_start"], str) else provider_subscription["current_period_start"] ) current_period_end = None if provider_subscription.get("current_period_end"): current_period_end = ( datetime.fromisoformat( provider_subscription["current_period_end"].replace( "Z", "+00:00") ) if isinstance(provider_subscription["current_period_end"], str) else provider_subscription["current_period_end"] ) subscription = await subscription_repo.create( customer_id=customer_id, plan_id=plan_id, provider=provider_name, provider_subscription_id=provider_subscription["provider_subscription_id"], status=provider_subscription["status"], quantity=quantity, current_period_start=current_period_start, current_period_end=current_period_end, cancel_at_period_end=provider_subscription.get( "cancel_at_period_end", False ), meta_info={ **(meta_info or {}), "provider_data": {"items": provider_subscription.get("items", [])}, }, ) # Publish event await self.event_publisher.publish_event( PaymentEvents.SUBSCRIPTION_CREATED, { "subscription_id": subscription.id, "customer_id": customer_id, "plan_id": plan_id, "provider": provider_name, "provider_subscription_id": provider_subscription[ "provider_subscription_id" ], }, ) # Return combined data return { "id": subscription.id, "customer_id": subscription.customer_id, "plan_id": subscription.plan_id, "provider": subscription.provider, "provider_subscription_id": subscription.provider_subscription_id, "status": subscription.status, "quantity": subscription.quantity, "current_period_start": subscription.current_period_start.isoformat(), "current_period_end": ( subscription.current_period_end.isoformat() if subscription.current_period_end else None ), "cancel_at_period_end": subscription.cancel_at_period_end, "created_at": subscription.created_at.isoformat(), }
[docs] async def get_subscription(self, subscription_id: str) -> Optional[Dict[str, Any]]: """ Get subscription details. Args: subscription_id: Subscription ID Returns: Subscription data if found, None otherwise """ if not self.db_session: raise RuntimeError("Database session not set") subscription_repo = SubscriptionRepository(self.db_session) subscription = await subscription_repo.get_with_plan(subscription_id) if not subscription: return None # Get provider subscription data provider_instance = self.get_provider(subscription.provider) try: provider_subscription = await provider_instance.retrieve_subscription( subscription.provider_subscription_id ) except Exception as e: logger.error(f"Error retrieving provider subscription: {str(e)}") provider_subscription = {"error": str(e)} # Return combined data return { "id": subscription.id, "customer_id": subscription.customer_id, "plan_id": subscription.plan_id, "plan_name": subscription.plan.name if subscription.plan else None, "provider": subscription.provider, "provider_subscription_id": subscription.provider_subscription_id, "status": subscription.status, "quantity": subscription.quantity, "current_period_start": subscription.current_period_start.isoformat(), "current_period_end": ( subscription.current_period_end.isoformat() if subscription.current_period_end else None ), "cancel_at_period_end": subscription.cancel_at_period_end, "created_at": subscription.created_at.isoformat(), "provider_data": provider_subscription, }
[docs] async def list_subscriptions( self, *, customer_id: Optional[str] = None, status: Optional[str] = None, limit: int = 50, offset: int = 0, ) -> List[Dict[str, Any]]: """Return subscriptions filtered by optional criteria.""" if not self.db_session: raise RuntimeError("Database session not set") subscriptions = await self.subscription_repo.list( customer_id=customer_id, status=status, limit=limit, offset=offset, include_plan=True, ) results: List[Dict[str, Any]] = [] for subscription in subscriptions: provider_data = None if subscription.meta_info: provider_data = subscription.meta_info.get("provider_data") results.append( { "id": subscription.id, "customer_id": subscription.customer_id, "plan_id": subscription.plan_id, "plan_name": subscription.plan.name if subscription.plan else None, "provider": subscription.provider, "provider_subscription_id": subscription.provider_subscription_id, "status": subscription.status, "quantity": subscription.quantity, "current_period_start": subscription.current_period_start.isoformat() if subscription.current_period_start else None, "current_period_end": subscription.current_period_end.isoformat() if subscription.current_period_end else None, "cancel_at_period_end": subscription.cancel_at_period_end, "created_at": subscription.created_at.isoformat(), "provider_data": provider_data, } ) return results
[docs] async def cancel_subscription( self, subscription_id: str, cancel_at_period_end: bool = True ) -> Dict[str, Any]: """ Cancel a subscription. Args: subscription_id: Subscription ID cancel_at_period_end: Whether to cancel at the end of the current period Returns: Updated subscription data """ if not self.db_session: raise RuntimeError("Database session not set") subscription_repo = SubscriptionRepository(self.db_session) subscription = await subscription_repo.get_by_id(subscription_id) if not subscription: raise ValueError(f"Subscription not found: {subscription_id}") # Cancel subscription in provider provider_instance = self.get_provider(subscription.provider) provider_result = await provider_instance.cancel_subscription( subscription.provider_subscription_id, cancel_at_period_end ) # Update subscription in database await subscription_repo.update( subscription_id, status="canceled" if not cancel_at_period_end else "active", cancel_at_period_end=( cancel_at_period_end if cancel_at_period_end else False ), canceled_at=datetime.utcnow() if not cancel_at_period_end else None, ) # Publish event await self.event_publisher.publish_event( PaymentEvents.SUBSCRIPTION_CANCELED, { "subscription_id": subscription_id, "cancel_at_period_end": cancel_at_period_end, "canceled_at": datetime.utcnow().isoformat(), }, ) # Return updated subscription return await self.get_subscription(subscription_id)
[docs] async def process_payment( self, customer_id: str, amount: float, currency: str, payment_method_id: Optional[str] = None, description: Optional[str] = None, meta_info: Optional[Dict[str, Any]] = None, mandate_id: Optional[str] = None, provider: Optional[str] = None, ) -> Dict[str, Any]: """ Process a one-time payment. Args: customer_id: Customer ID amount: Payment amount currency: Currency code payment_method_id: Optional payment method ID description: Optional payment description meta_info: Optional payment meta_info Returns: Payment data """ if not self.db_session: raise RuntimeError("Database session not set") customer_repo = CustomerRepository(self.db_session) customer = await customer_repo.get_with_provider_customers(customer_id) if not customer: raise ValueError(f"Customer not found: {customer_id}") # Determine provider provider_name = provider or self.default_provider # If provider not explicitly requested and payment method encodes it, respect that if not provider and payment_method_id and ":" in payment_method_id: provider_name, payment_method_id = payment_method_id.split(":", 1) # Get provider customer provider_customer = await customer_repo.get_provider_customer( customer_id, provider_name ) if not provider_customer: raise ValueError( f"Customer not found for provider {provider_name}") # Enrich meta info with customer context for providers that need it request_meta_info = dict(meta_info or {}) provider_meta_payload = dict(request_meta_info) provider_meta_payload["customer_context"] = { "id": customer.id, "email": customer.email, "name": customer.name, "meta_info": customer.meta_info or {}, } # Process payment with provider provider_instance = self.get_provider(provider_name) provider_payment = await provider_instance.process_payment( amount=amount, currency=currency, provider_customer_id=provider_customer.provider_customer_id, payment_method_id=payment_method_id, description=description, mandate_id=mandate_id, meta_info=provider_meta_payload, ) combined_meta_info = dict(request_meta_info) if provider_payment.get("meta_info"): combined_meta_info.setdefault("provider_data", {}) combined_meta_info["provider_data"][provider_name] = provider_payment["meta_info"] stored_meta_info = combined_meta_info or None # Create payment in database payment_repo = PaymentRepository(self.db_session) payment = await payment_repo.create( customer_id=customer_id, provider=provider_name, provider_payment_id=provider_payment["provider_payment_id"], amount=amount, currency=currency, status=provider_payment["status"], payment_method=payment_method_id, error_message=provider_payment.get("error_message"), meta_info=( {**stored_meta_info, "description": description} if stored_meta_info else ({"description": description} if description else None) ), ) # Publish event event_type = ( PaymentEvents.PAYMENT_SUCCEEDED if provider_payment["status"] == "COMPLETED" else PaymentEvents.PAYMENT_CREATED ) await self.event_publisher.publish_event( event_type, { "payment_id": payment.id, "customer_id": customer_id, "amount": amount, "currency": currency, "status": provider_payment["status"], "provider": provider_name, "provider_payment_id": provider_payment["provider_payment_id"], }, ) # Return payment data return { "id": payment.id, "customer_id": payment.customer_id, "amount": payment.amount, "currency": payment.currency, "status": payment.status, "payment_method": payment.payment_method, "error_message": payment.error_message, "provider": payment.provider, "provider_payment_id": payment.provider_payment_id, "created_at": payment.created_at.isoformat(), "meta_info": payment.meta_info, }
[docs] async def sync_resources( self, resources: Optional[List[str]] = None, provider: Optional[str] = None, filters: Optional[Dict[str, Any]] = None, limit: int = 100, offset: int = 0, ) -> Dict[str, Any]: """Synchronize local DB state with provider(s). This is best-effort: for each requested resource we attempt to fetch authoritative data from the configured provider(s) and update local rows where possible. If a provider does not implement the expected retrieval method for a resource, the resource is skipped for that provider. Args: resources: list of resource names to sync (customers, products, plans, payments, subscriptions, payment_methods). If None, all supported resources are synced. provider: optional provider name to limit sync to one provider. filters: resource-specific filters e.g. {'customer_id': '...'} limit/offset: pagination for resource listing. Returns: A dict summarizing how many items were examined/updated and any errors encountered for each resource. """ if not self.db_session: raise RuntimeError("Database session not set") supported = [ "customers", "products", "plans", "payments", "subscriptions", "payment_methods", ] to_sync = set(resources or supported) result: Dict[str, Any] = {r: {"synced": 0, "updated": 0, "created": 0, "errors": []} for r in to_sync if r in supported} filters = filters or {} # CUSTOMERS if "customers" in to_sync: try: cust_repo = CustomerRepository(self.db_session) if filters.get("customer_id"): customers = [await cust_repo.get_with_provider_customers(filters["customer_id"])] else: customers = await cust_repo.list(limit=limit, offset=offset, include_provider_customers=True) for customer in customers: if not customer: continue result["customers"]["synced"] += 1 # For each provider link, try to retrieve provider data for pc in customer.provider_customers: if provider and pc.provider != provider: continue provider_instance = self.get_provider(pc.provider) retrieve = getattr(provider_instance, "retrieve_customer", None) if not callable(retrieve): continue try: pdata = await retrieve(pc.provider_customer_id) # Persist provider data into customer.meta_info.provider_data meta = dict(customer.meta_info or {}) provider_map = dict(meta.get("provider_data") or {}) provider_map[pc.provider] = pdata meta["provider_data"] = provider_map await cust_repo.update(customer.id, meta_info=meta) result["customers"]["updated"] += 1 except Exception as e: result["customers"]["errors"].append(str(e)) except Exception as e: result["customers"]["errors"].append(str(e)) # PRODUCTS if "products" in to_sync: try: prod_repo = ProductRepository(self.db_session) if filters.get("product_id"): prods = [await prod_repo.get_by_id(filters["product_id"])] else: prods = await prod_repo.list(limit=limit, offset=offset) for p in prods: if not p: continue result["products"]["synced"] += 1 meta = p.meta_info or {} prov = provider or meta.get("provider") prov_pid = meta.get("provider_product_id") if not prov or not prov_pid: continue provider_instance = self.get_provider(prov) retrieve = getattr(provider_instance, "retrieve_product", None) if not callable(retrieve): # provider doesn't expose retrieve_product: skip continue try: pdata = await retrieve(prov_pid) provider_map = dict(meta.get("provider_data") or {}) provider_map[prov] = pdata meta["provider_data"] = provider_map await prod_repo.update(p.id, meta_info=meta) result["products"]["updated"] += 1 except Exception as e: result["products"]["errors"].append(str(e)) except Exception as e: result["products"]["errors"].append(str(e)) # PLANS if "plans" in to_sync: try: plan_repo = PlanRepository(self.db_session) if filters.get("plan_id"): plans = [await plan_repo.get_by_id(filters["plan_id"])] else: plans = await plan_repo.list(limit=limit, offset=offset) for pl in plans: if not pl: continue result["plans"]["synced"] += 1 meta = pl.meta_info or {} prov = provider or meta.get("provider") prov_price_id = meta.get("provider_price_id") if not prov or not prov_price_id: continue provider_instance = self.get_provider(prov) # Try a couple of possible method names retrieve = getattr(provider_instance, "retrieve_price", None) or getattr(provider_instance, "retrieve_plan", None) if not callable(retrieve): continue try: pdata = await retrieve(prov_price_id) provider_map = dict(meta.get("provider_data") or {}) provider_map[prov] = pdata meta["provider_data"] = provider_map await plan_repo.update(pl.id, meta_info=meta) result["plans"]["updated"] += 1 except Exception as e: result["plans"]["errors"].append(str(e)) except Exception as e: result["plans"]["errors"].append(str(e)) # SUBSCRIPTIONS if "subscriptions" in to_sync: try: sub_repo = SubscriptionRepository(self.db_session) if filters.get("subscription_id"): subs = [await sub_repo.get_by_id(filters["subscription_id"])] else: subs = await sub_repo.list(limit=limit, offset=offset, include_plan=True) for s in subs: if not s: continue result["subscriptions"]["synced"] += 1 if provider and s.provider != provider: continue provider_instance = self.get_provider(s.provider) retrieve = getattr(provider_instance, "retrieve_subscription", None) if not callable(retrieve): continue try: pdata = await retrieve(s.provider_subscription_id) # Update local subscription fields where appropriate update_fields: Dict[str, Any] = {} status = pdata.get("status") if status: update_fields["status"] = status cps = pdata.get("current_period_start") or pdata.get("current_period_start_iso") cpe = pdata.get("current_period_end") or pdata.get("current_period_end_iso") if cps: try: update_fields["current_period_start"] = ( datetime.fromisoformat(cps.replace("Z", "+00:00")) if isinstance(cps, str) else cps ) except Exception: pass if cpe: try: update_fields["current_period_end"] = ( datetime.fromisoformat(cpe.replace("Z", "+00:00")) if isinstance(cpe, str) else cpe ) except Exception: pass if "cancel_at_period_end" in pdata: update_fields["cancel_at_period_end"] = pdata.get("cancel_at_period_end") if update_fields: await sub_repo.update(s.id, **update_fields) result["subscriptions"]["updated"] += 1 except Exception as e: result["subscriptions"]["errors"].append(str(e)) except Exception as e: result["subscriptions"]["errors"].append(str(e)) # PAYMENTS if "payments" in to_sync: try: pay_repo = PaymentRepository(self.db_session) if filters.get("payment_id"): pays = [await pay_repo.get_by_id(filters["payment_id"])] else: pays = await pay_repo.list(limit=limit, offset=offset) for p in pays: if not p: continue result["payments"]["synced"] += 1 if provider and p.provider != provider: continue provider_instance = self.get_provider(p.provider) retrieve = getattr(provider_instance, "retrieve_payment", None) if not callable(retrieve): continue try: pdata = await retrieve(p.provider_payment_id) update_fields = {} if pdata.get("status"): update_fields["status"] = pdata.get("status") if pdata.get("meta_info"): update_fields["meta_info"] = {**(p.meta_info or {}), "provider_data": {p.provider: pdata.get("meta_info")}} if update_fields: await pay_repo.update(p.id, **update_fields) result["payments"]["updated"] += 1 except Exception as e: result["payments"]["errors"].append(str(e)) except Exception as e: result["payments"]["errors"].append(str(e)) # PAYMENT METHODS (attempt reconcile provider-side saved methods) if "payment_methods" in to_sync: try: cust_repo = CustomerRepository(self.db_session) pm_repo = PaymentMethodRepository(self.db_session) # If a customer_id filter is present we only fetch for that customer if filters.get("customer_id"): customers = [await cust_repo.get_with_provider_customers(filters["customer_id"])] else: customers = await cust_repo.list(limit=limit, offset=offset, include_provider_customers=True) for customer in customers: if not customer: continue # For each provider-link, fetch provider's payment methods for pc in customer.provider_customers: if provider and pc.provider != provider: continue prov = pc.provider provider_instance = self.get_provider(prov) list_pm = getattr(provider_instance, "list_payment_methods", None) if not callable(list_pm): continue try: pms = await list_pm(pc.provider_customer_id) result["payment_methods"]["synced"] += len(pms) for pm in pms: # pm should include payment_method_id and card data pid = pm.get("payment_method_id") if not pid: continue existing = await pm_repo.get_by_provider_method_id(prov, pid) if existing: # update metadata await pm_repo.update(existing.id, meta_info={**(existing.meta_info or {}), "provider_data": pm}) result["payment_methods"]["updated"] += 1 else: # create new DB-backed method card = pm.get("card") or {} await pm_repo.create( customer_id=customer.id, provider=prov, provider_payment_method_id=pid, mandate_id=pm.get("mandate_id"), is_default=pm.get("is_default", False), card_brand=card.get("brand"), card_last4=card.get("last4"), card_exp_month=card.get("exp_month"), card_exp_year=card.get("exp_year"), meta_info={"provider_data": pm}, ) result["payment_methods"]["created"] += 1 except Exception as e: result["payment_methods"]["errors"].append(str(e)) except Exception as e: result["payment_methods"]["errors"].append(str(e)) # Convert result payload items into simpler dicts # (ensure errors lists exist) for k, v in result.items(): if "errors" not in v or v["errors"] is None: v["errors"] = [] return {"summary": result}
[docs] async def create_sync_job(self, resources: Optional[List[str]] = None, provider: Optional[str] = None, filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Create a SyncJob record and return its basic details. Actual work is performed by execute_sync_job which is expected to be scheduled as a background task by the API layer. """ if not self.db_session: raise RuntimeError("Database session not set") job = await self.sync_job_repo.create(resources=resources, provider=provider, filters=filters) return {"id": job.id, "status": job.status, "created_at": job.created_at.isoformat(), "updated_at": job.updated_at.isoformat()}
[docs] async def execute_sync_job(self, job_id: str): """Run sync_resources for a previously created job and persist results. This method will set job status to 'running', invoke sync_resources, and finally store the result and mark job 'completed' or 'failed'. """ # For background execution we cannot rely on the request-scoped DB # session. Create a fresh session from the repositories' sessionmaker # and run the job inside it. from ..db.repositories import _sessionmaker # type: ignore if _sessionmaker is None: raise RuntimeError("Database not initialized; cannot run job") async with _sessionmaker() as session: # Use a fresh PaymentService instance tied to this session so # repositories use a valid session for the duration of the job. svc = PaymentService(self.config, self.event_publisher, session) job = await svc.sync_job_repo.get_by_id(job_id) if not job: raise ValueError(f"Sync job not found: {job_id}") # mark running await svc.sync_job_repo.update_status(job_id, "running") try: res = await svc.sync_resources(resources=job.resources, provider=job.provider, filters=job.filters) await svc.sync_job_repo.update_status(job_id, "completed", result=res) except Exception as e: await svc.sync_job_repo.update_status(job_id, "failed", result={"error": str(e)}) raise
[docs] async def refund_payment( self, payment_id: str, amount: Optional[float] = None ) -> Dict[str, Any]: """ Refund a payment, partially or fully. Args: payment_id: Payment ID amount: Optional refund amount (full refund if not specified) Returns: Updated payment data """ if not self.db_session: raise RuntimeError("Database session not set") payment_repo = PaymentRepository(self.db_session) payment = await payment_repo.get_by_id(payment_id) if not payment: raise ValueError(f"Payment not found: {payment_id}") # Refund payment in provider provider_instance = self.get_provider(payment.provider) refund = await provider_instance.refund_payment( payment.provider_payment_id, amount ) # Update payment in database refund_amount = amount or payment.amount await payment_repo.update( payment_id, status=( "refunded" if refund_amount >= payment.amount else "partially_refunded" ), refunded_amount=refund_amount, ) # Publish event await self.event_publisher.publish_event( PaymentEvents.PAYMENT_REFUNDED, { "payment_id": payment_id, "refund_amount": refund_amount, "currency": payment.currency, "provider": payment.provider, "provider_refund_id": refund.get("provider_refund_id"), }, ) # Return updated payment payment = await payment_repo.get_by_id(payment_id) return { "id": payment.id, "customer_id": payment.customer_id, "amount": payment.amount, "refunded_amount": payment.refunded_amount, "currency": payment.currency, "status": payment.status, "provider": payment.provider, "provider_payment_id": payment.provider_payment_id, "created_at": payment.created_at.isoformat(), }
[docs] async def list_payments( self, *, customer_id: Optional[str] = None, status: Optional[str] = None, limit: int = 50, offset: int = 0, ) -> List[Dict[str, Any]]: """Return payments filtered by optional criteria.""" if not self.db_session: raise RuntimeError("Database session not set") payments = await self.payment_repo.list( customer_id=customer_id, status=status, limit=limit, offset=offset, include_customer=True, ) results: List[Dict[str, Any]] = [] for payment in payments: status_value = ( payment.status.value if hasattr(payment.status, "value") else payment.status ) results.append( { "id": payment.id, "customer_id": payment.customer_id, "amount": payment.amount, "refunded_amount": payment.refunded_amount, "currency": payment.currency, "status": status_value, "payment_method": payment.payment_method, "error_message": payment.error_message, "provider": payment.provider, "provider_payment_id": payment.provider_payment_id, "created_at": payment.created_at.isoformat(), "meta_info": payment.meta_info, } ) return results
[docs] async def record_usage( self, subscription_id: str, quantity: float, timestamp: Optional[str] = None, description: Optional[str] = None, ) -> Dict[str, Any]: """ Record usage for a subscription. Args: subscription_id: Subscription ID quantity: Usage quantity timestamp: Optional usage timestamp description: Optional usage description Returns: Usage record data """ if not self.db_session: raise RuntimeError("Database session not set") subscription_repo = SubscriptionRepository(self.db_session) subscription = await subscription_repo.get_by_id(subscription_id) if not subscription: raise ValueError(f"Subscription not found: {subscription_id}") # Record usage with provider provider_instance = self.get_provider(subscription.provider) usage = await provider_instance.record_usage( subscription.provider_subscription_id, quantity, timestamp ) # Record usage in database from ..db.models import UsageRecord usage_repo = BaseRepository(UsageRecord, self.db_session) usage_record = await usage_repo.create( subscription_id=subscription_id, quantity=quantity, timestamp=( datetime.fromisoformat( timestamp) if timestamp else datetime.utcnow() ), description=description, ) # Publish event await self.event_publisher.publish_event( PaymentEvents.USAGE_RECORDED, { "subscription_id": subscription_id, "quantity": quantity, "timestamp": usage_record.timestamp.isoformat(), "description": description, }, ) # Return usage data return { "id": usage_record.id, "subscription_id": usage_record.subscription_id, "quantity": usage_record.quantity, "timestamp": usage_record.timestamp.isoformat(), "description": usage_record.description, "provider_usage_id": usage.get("id"), }
[docs] async def handle_webhook( self, provider: str, payload: Any, signature: Optional[str] = None ) -> Dict[str, Any]: """ Handle webhooks from payment providers. Args: provider: Provider name payload: Webhook payload signature: Optional webhook signature Returns: Processed webhook data """ if provider not in self.providers: raise ValueError(f"Payment provider '{provider}' not found") provider_instance = self.get_provider(provider) # Process webhook with provider result = await provider_instance.webhook_handler(payload, signature) # Handle webhook based on standardized event type event_type = result.get("standardized_event_type") if event_type == "payment.succeeded": # Handle successful payment # Implementation depends on business logic pass elif event_type == "payment.failed": # Handle failed payment pass elif event_type == "subscription.created": # Handle subscription creation pass elif event_type == "subscription.updated": # Handle subscription update pass elif event_type == "subscription.canceled": # Handle subscription cancellation pass # Publish webhook event await self.event_publisher.publish_event( f"webhook.{provider}.{event_type}", { "provider": provider, "event_type": event_type, "data": result.get("data"), }, ) return result
# Add the dependency injection function
[docs] def get_payment_service(): """Dependency to get payment service instance.""" # This is a placeholder - the actual implementation will be provided through DI raise NotImplementedError( "Payment service should be injected through FastAPI DI.")