Source code for terminusgps.aws.notifications

import asyncio
import os
from collections.abc import Sequence
from contextlib import AsyncExitStack

from aiobotocore.session import AioSession
from loguru import logger


[docs] class AsyncNotificationManager: """Asyncronously delivers notifications using `AWS End User Messaging <https://docs.aws.amazon.com/end-user-messaging/>`_.""" def __init__( self, origin_pool_arn: str | None = None, configuration_set: str | None = None, max_sms_price: str = "0.20", max_voice_price: str = "0.20", region_name: str = "us-east-1", debug_enabled: bool = False, logger_level: int = 10, logger_days: int = 10, ) -> None: """ Sets attributes on the notification manager. :param origin_pool_arn: A phone pool ARN for notification dispatch. Default is :envvar:`AWS_MESSAGING_ORIGIN_POOL`. :type origin_pool_arn: :py:obj:`str` | :py:obj:`None` :param configuration_set: An end-user messaging configuration set ARN. Default is :envvar:`AWS_MESSAGING_CONFIGURATION`. :type configuration_set: :py:obj:`str` | :py:obj:`None` :param max_sms_price: Max price to spend on a single SMS message. Default is :py:obj:`"0.20"`. :type max_sms_price: :py:obj:`str` :param max_voice_price: Max price to spend per minute on a single voice message. Default is :py:obj:`"0.20"`. :type max_voice_price: :py:obj:`str` :param region_name: An AWS region name used to open an AWS client. Default is :py:obj:`"us-east-1"`. :type region_name: :py:obj:`str` :param debug_enabled: Whether or not to enable debug mode. Default is :py:obj:`False` :type debug_enabled: :py:obj:`False` :param logger_level: A logger level. Default is :py:obj:`int` :type logger_level: :py:obj:`int` :param logger_days: How long in days logging data should be saved. :type logger_days: :py:obj:`int` :raises ValueError: If ``origin_pool_arn`` wasn't provided and :envvar:`AWS_MESSAGING_ORIGIN_POOL` wasn't set. :raises ValueError: If ``configuration_set`` wasn't provided and :envvar:`AWS_MESSAGING_CONFIGURATION` wasn't set. :returns: Nothing. :rtype: :py:obj:`None` """ if not origin_pool_arn and not os.getenv("AWS_MESSAGING_ORIGIN_POOL"): raise ValueError(f"'origin_pool_arn' is required, got '{origin_pool_arn}'.") if not configuration_set and not os.getenv("AWS_MESSAGING_CONFIGURATION"): raise ValueError( f"'configuration_set' is required, got '{configuration_set}'." ) self._origin_pool_arn = origin_pool_arn or os.getenv( "AWS_MESSAGING_ORIGIN_POOL", "" ) self._configuration_set = configuration_set or os.getenv( "AWS_MESSAGING_CONFIGURATION", "" ) self._exit_stack = AsyncExitStack() self._pinpoint_client = None self._region_name = region_name self._max_sms_price = max_sms_price self._max_voice_price = max_voice_price self._debug = debug_enabled logger.add( f"logs/{self.__class__.__name__}.log", level=logger_level, retention=f"{logger_days} days", diagnose=debug_enabled, ) async def __aenter__(self) -> "AsyncNotificationManager": """ Creates asyncronous clients. :returns: The notification manager. :rtype: :py:obj:`~terminusgps.aws.notifications.AsyncNotificationManager` """ session = AioSession() self._pinpoint_client = await self._exit_stack.enter_async_context( session.create_client("pinpoint-sms-voice-v2", region_name=self.region_name) ) return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """ Destroys asyncronous clients. :param exc_type: Exception type. :param exc_val: Exception value. :param exc_tb: Exception traceback. :returns: Nothing. :rtype: :py:obj:`None` """ await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) @property def region_name(self) -> str: """ An AWS region name. :type: :py:obj:`str` """ return self._region_name @property def configuration_set(self) -> str: """ The configuration set to use for messaging. :type: :py:obj:`str` """ return self._configuration_set @property def origin_pool(self) -> str: """ The origination pool to use for messaging. :type: :py:obj:`str` """ return self._origin_pool_arn @property def max_sms_price(self) -> str: """ The max allowed price per sms message. :type: :py:obj:`str` :value: :py:obj:`"0.20"` """ return self._max_sms_price @property def max_voice_price(self) -> str: """ The max allowed price per voice message. :type: :py:obj:`str` :value: :py:obj:`"0.20"` """ return self._max_voice_price @property def debug(self) -> bool: """ Whether or not debug mode is enabled. :type: :py:obj:`bool` :value: :py:obj:`False` """ return self._debug
[docs] async def send_sms( self, phone: str, message: str, ttl: int = 300, dry_run: bool = False, feedback: bool = False, ) -> dict[str, str]: """ Texts ``message`` to ``phone`` via sms. :param phone: A destination phone number. :type phone: :py:obj:`str` :param message: A message body. :type message: :py:obj:`str` :param ttl: Time to live in ms. Default is :py:obj:`300`. :type ttl: :py:obj:`int` :param dry_run: Whether or not to execute the message as a dry run. Default is :py:obj:`False`. :type dry_run: :py:obj:`bool` :param feedback: Whether or not to include message feedback in the response. Default is :py:obj:`False`. :type feedback: :py:obj:`bool` :raises AssertionError: If :py:attr:`_pinpoint_client` wasn't set. :returns: An sms message response. :rtype: :py:obj:`dict` """ assert self._pinpoint_client, "Asyncronous client wasn't set." logger.debug(f"Texting '{message}' to '{phone}'...") return await self._pinpoint_client.send_text_message( **{ "DestinationPhoneNumber": phone, "OriginationIdentity": self.origin_pool, "MessageBody": message, "MessageType": "TRANSACTIONAL", "ConfigurationSetName": self.configuration_set, "MaxPrice": self.max_sms_price, "TimeToLive": ttl, "DryRun": dry_run or self.debug, "MessageFeedbackEnabled": feedback, } )
[docs] async def send_voice( self, phone: str, message: str, ttl: int = 300, voice_id: str = "Joanna", dry_run: bool = False, feedback: bool = False, ) -> dict[str, str]: """ Calls ``phone`` and reads ``message`` aloud. :param phone: A destination phone number. :type phone: :py:obj:`str` :param message: A message body. :type message: :py:obj:`str` :param ttl: Time to live in ms. Default is :py:obj:`300`. :type ttl: :py:obj:`int` :param voice_id: A voice id to use for speech synthesis. Default is :py:obj:`"Joanna"`. :type voice_id: :py:obj:`str` :param dry_run: Whether or not to execute the message as a dry run. Default is :py:obj:`False`. :type dry_run: :py:obj:`bool` :param feedback: Whether or not to include message feedback in the response. Default is :py:obj:`False`. :type feedback: :py:obj:`bool` :raises AssertionError: If :py:attr:`_pinpoint_client` wasn't set. :returns: A voice message response. :rtype: :py:obj:`dict` """ assert self._pinpoint_client, "Asyncronous client wasn't set." logger.debug(f"Reading '{message}' aloud to '{phone}'...") return await self._pinpoint_client.send_voice_message( **{ "DestinationPhoneNumber": phone, "OriginationIdentity": self.origin_pool, "MessageBody": message, "MessageBodyTextType": "TEXT", "VoiceId": voice_id.upper(), "ConfigurationSetName": self.configuration_set, "MaxPricePerMinute": self.max_voice_price, "TimeToLive": ttl, "DryRun": dry_run or self.debug, "MessageFeedbackEnabled": feedback, } )
[docs] async def batch_send_sms( self, phones: Sequence[str], message: str, **kwargs ) -> list[dict[str, str]]: """ Sends ``message`` to all phone numbers in ``phones`` via sms. :param phones: A sequence of phone numbers. :type phones: :py:obj:`~collections.abc.Sequence` :param message: A message body. :type message: :py:obj:`str` :param kwargs: Additional keyword arguments passed to :py:meth:`~terminusgps.aws.notifications.AsyncNotificationManager.send_sms`. :raises AssertionError: If :py:attr:`_pinpoint_client` wasn't set. :returns: A list of sms message responses. :rtype: :py:obj:`dict` .. seealso:: :py:meth:`~terminusgps.aws.notifications.AsyncNotificationManager.send_sms` for details on available keyword arguments. """ assert self._pinpoint_client, "Asyncronous client wasn't set." return await asyncio.gather( *[self.send_sms(phone=phone, message=message, **kwargs) for phone in phones] )
[docs] async def batch_send_voice( self, phones: Sequence[str], message: str, **kwargs ) -> list[dict[str, str]]: """ Calls each number in ``phones`` and reads ``message`` aloud. :param phone: A sequence of phone numbers. :type phone: :py:obj:`~collections.abc.Sequence` :param message: A message body. :type message: :py:obj:`str` :param kwargs: Additional keyword arguments passed to :py:meth:`~terminusgps.aws.notifications.AsyncNotificationManager.send_voice`. :raises AssertionError: If :py:attr:`_pinpoint_client` wasn't set. :returns: A voice message response. :rtype: :py:obj:`dict` .. seealso:: :py:meth:`~terminusgps.aws.notifications.AsyncNotificationManager.send_voice` for details on available keyword arguments. """ assert self._pinpoint_client, "Asyncronous client wasn't set." return await asyncio.gather( *[ self.send_voice(phone=phone, message=message, **kwargs) for phone in phones ] )