diff --git a/python-sdk-master.zip b/python-sdk-master.zip new file mode 100644 index 00000000..0bd61cf4 Binary files /dev/null and b/python-sdk-master.zip differ diff --git a/topgg/ratelimiter.py b/topgg/ratelimiter.py index 028a98ee..e6ab6b9e 100644 --- a/topgg/ratelimiter.py +++ b/topgg/ratelimiter.py @@ -1,110 +1,63 @@ -# -*- coding: utf-8 -*- - -# The MIT License (MIT) - -# Copyright (c) 2021 Assanali Mukhanov - -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the "Software"), -# to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, -# and/or sell copies of the Software, and to permit persons to whom the -# Software is furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. - -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN THE SOFTWARE. - -import asyncio -import collections -from datetime import datetime +from collections.abc import Iterable from types import TracebackType -from typing import Any, Awaitable, Callable, List, Optional, Type +from collections import deque +from time import time +import asyncio +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Optional +# Shared reusable __aexit__ logic +async def shared_aexit(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None: + async with self.__lock: + self.__calls.append(time()) + while self._timespan >= self.__period: + self.__calls.popleft() -class AsyncRateLimiter: - """ - Provides rate limiting for an operation with a configurable number of requests for a time period. - """ - __lock: asyncio.Lock - callback: Optional[Callable[[float], Awaitable[Any]]] - max_calls: int - period: float - calls: collections.deque +class Ratelimiter: + """Handles ratelimits for a specific endpoint.""" - def __init__( - self, - max_calls: int, - period: float = 1.0, - callback: Optional[Callable[[float], Awaitable[Any]]] = None, - ): - if period <= 0: - raise ValueError("Rate limiting period should be > 0") - if max_calls <= 0: - raise ValueError("Rate limiting number of calls should be > 0") - self.calls = collections.deque() + __slots__ = ('__lock', '__max_calls', '__period', '__calls') - self.period = period - self.max_calls = max_calls - self.callback = callback + def __init__(self, max_calls: int, period: float = 1.0): + self.__calls = deque() + self.__period = period + self.__max_calls = max_calls self.__lock = asyncio.Lock() - async def __aenter__(self) -> "AsyncRateLimiter": + async def __aenter__(self) -> 'Ratelimiter': async with self.__lock: - if len(self.calls) >= self.max_calls: - until = datetime.utcnow().timestamp() + self.period - self._timespan - if self.callback: - asyncio.ensure_future(self.callback(until)) - sleep_time = until - datetime.utcnow().timestamp() + if len(self.__calls) >= self.__max_calls: + until = time() + self.__period - self._timespan + sleep_time = until - time() if sleep_time > 0: await asyncio.sleep(sleep_time) - return self - - async def __aexit__( - self, - exc_type: Type[BaseException], - exc_val: BaseException, - exc_tb: TracebackType, - ) -> None: - async with self.__lock: - # Store the last operation timestamp. - self.calls.append(datetime.utcnow().timestamp()) + return self - while self._timespan >= self.period: - self.calls.popleft() + # Assign shared logic + __aexit__ = shared_aexit @property def _timespan(self) -> float: - return self.calls[-1] - self.calls[0] + return self.__calls[-1] - self.__calls[0] if len(self.__calls) >= 2 else 0.0 + +class Ratelimiters: + """Handles ratelimits for multiple endpoints.""" -class AsyncRateLimiterManager: - rate_limiters: List[AsyncRateLimiter] + __slots__ = ('__ratelimiters',) - def __init__(self, rate_limiters: List[AsyncRateLimiter]): - self.rate_limiters = rate_limiters + def __init__(self, ratelimiters: Iterable[Ratelimiter]): + self.__ratelimiters = tuple(ratelimiters) - async def __aenter__(self) -> "AsyncRateLimiterManager": - [await manager.__aenter__() for manager in self.rate_limiters] + async def __aenter__(self) -> 'Ratelimiters': + for ratelimiter in self.__ratelimiters: + await ratelimiter.__aenter__() return self - async def __aexit__( - self, - exc_type: Type[BaseException], - exc_val: BaseException, - exc_tb: TracebackType, - ) -> None: + async def __aexit__(self, exc_type, exc_val, exc_tb): await asyncio.gather( - *[ - manager.__aexit__(exc_type, exc_val, exc_tb) - for manager in self.rate_limiters - ] + *(r.__aexit__(exc_type, exc_val, exc_tb) for r in self.__ratelimiters) ) diff --git a/topgg/webhook.py b/topgg/webhook.py index 4b94ec2b..b7365e46 100644 --- a/topgg/webhook.py +++ b/topgg/webhook.py @@ -1,368 +1,122 @@ -# -*- coding: utf-8 -*- - -# The MIT License (MIT) - -# Copyright (c) 2021 Assanali Mukhanov - -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the "Software"), -# to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, -# and/or sell copies of the Software, and to permit persons to whom the -# Software is furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. - -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN THE SOFTWARE. - -__all__ = [ - "endpoint", - "BoundWebhookEndpoint", - "WebhookEndpoint", - "WebhookManager", - "WebhookType", -] - -import enum -import typing as t - -import aiohttp +from collections.abc import Awaitable, Callable +from typing import Any, Optional, Union +from inspect import isawaitable +from urllib import parse from aiohttp import web -from topgg.errors import TopGGException +RawCallback = Callable[[web.Request], Awaitable[web.StreamResponse]] +OnVoteCallback = Callable[["Vote"], Any] +OnVoteDecorator = Callable[[OnVoteCallback], RawCallback] -from .data import DataContainerMixin -from .types import BotVoteData, GuildVoteData -if t.TYPE_CHECKING: - from aiohttp.web import Request, StreamResponse +class Vote: + """A dispatched Top.gg vote event.""" -T = t.TypeVar("T", bound="WebhookEndpoint") -_HandlerT = t.Callable[["Request"], t.Awaitable["StreamResponse"]] + __slots__ = ("receiver_id", "voter_id", "is_server", "is_test", "is_weekend", "query") + def __init__(self, json: dict[str, Any]) -> None: + self.receiver_id = int(json.get("bot", json.get("guild"))) + self.voter_id = int(json["user"]) + self.is_server = "guild" in json + self.is_test = json["type"] == "test" + self.is_weekend = bool(json.get("isWeekend")) -class WebhookType(enum.Enum): - """An enum that represents the type of an endpoint.""" + query_str = json.get("query") + self.query = { + k: v[0] for k, v in parse.parse_qs(parse.urlsplit(query_str).query).items() + } if query_str else {} - BOT = enum.auto() - """Marks the endpoint as a bot webhook.""" + def __repr__(self) -> str: + return f"" - GUILD = enum.auto() - """Marks the endpoint as a guild webhook.""" - -class WebhookManager(DataContainerMixin): +class Webhooks: """ - A class for managing Top.gg webhooks. + Receive events from the Top.gg servers. + + :param auth: The default password to use. + :param port: The default port to use. """ - __app: web.Application - _webserver: web.TCPSite - _is_closed: bool - __slots__ = ("__app", "_webserver", "_is_running") + __slots__ = ("__app", "__server", "__default_auth", "__default_port", "__running") - def __init__(self) -> None: - super().__init__() + def __init__(self, auth: Optional[str] = None, port: Optional[int] = None) -> None: self.__app = web.Application() - self._is_running = False - - @t.overload - def endpoint(self, endpoint_: None = None) -> "BoundWebhookEndpoint": - ... - - @t.overload - def endpoint(self, endpoint_: "WebhookEndpoint") -> "WebhookManager": - ... - - def endpoint(self, endpoint_: t.Optional["WebhookEndpoint"] = None) -> t.Any: - """Helper method that returns a WebhookEndpoint object. - - Args: - `endpoint_` (:obj:`typing.Optional` [ :obj:`WebhookEndpoint` ]) - The endpoint to add. - - Returns: - :obj:`typing.Union` [ :obj:`WebhookManager`, :obj:`BoundWebhookEndpoint` ]: - An instance of :obj:`WebhookManager` if endpoint was provided, - otherwise :obj:`BoundWebhookEndpoint`. - - Raises: - :obj:`~.errors.TopGGException` - If the endpoint is lacking attributes. - """ - if endpoint_: - if not hasattr(endpoint_, "_callback"): - raise TopGGException("endpoint missing callback.") - - if not hasattr(endpoint_, "_type"): - raise TopGGException("endpoint missing type.") - - if not hasattr(endpoint_, "_route"): - raise TopGGException("endpoint missing route.") - - self.app.router.add_post( - endpoint_._route, - self._get_handler( - endpoint_._type, endpoint_._auth, endpoint_._callback - ), - ) - return self - - return BoundWebhookEndpoint(manager=self) - - async def start(self, port: int) -> None: - """Runs the webhook. - - Args: - port (int) - The port to run the webhook on. - """ - runner = web.AppRunner(self.__app) - await runner.setup() - self._webserver = web.TCPSite(runner, "0.0.0.0", port) - await self._webserver.start() - self._is_running = True + self.__server = None + self.__default_auth = auth + self.__default_port = port + self.__running = False - @property - def is_running(self) -> bool: - """Returns whether or not the webserver is running.""" - return self._is_running + def __repr__(self) -> str: + return f"" - @property - def app(self) -> web.Application: - """Returns the internal web application that handles webhook requests. + def on_vote( + self, + route: str, + auth: Optional[str] = None, + callback: Optional[OnVoteCallback] = None + ) -> Union[OnVoteCallback, OnVoteDecorator]: + if not isinstance(route, str): + raise TypeError("Missing route argument.") - Returns: - :class:`aiohttp.web.Application`: - The internal web application. - """ - return self.__app + effective_auth = auth or self.__default_auth + if not effective_auth: + raise TypeError("Missing password.") - async def close(self) -> None: - """Stops the webhook.""" - await self._webserver.stop() - self._is_running = False - - def _get_handler( - self, type_: WebhookType, auth: str, callback: t.Callable[..., t.Any] - ) -> _HandlerT: - async def _handler(request: aiohttp.web.Request) -> web.Response: - if request.headers.get("Authorization", "") != auth: - return web.Response(status=401, text="Unauthorized") - - data = await request.json() - await self._invoke_callback( - callback, - (BotVoteData if type_ is WebhookType.BOT else GuildVoteData)(**data), - ) - return web.Response(status=200, text="OK") - - return _handler + def decorator(inner_callback: OnVoteCallback) -> RawCallback: + async def handler(request: web.Request) -> web.Response: + if request.headers.get("Authorization") != effective_auth: + return web.Response(status=401, text="Unauthorized") + result = inner_callback(Vote(await request.json())) + if isawaitable(result): + await result -CallbackT = t.Callable[..., t.Any] + return web.Response(status=200, text="OK") + self.__app.router.add_post(route, handler) + return handler -class WebhookEndpoint: - """ - A helper class to setup webhook endpoint. - """ - - __slots__ = ("_callback", "_auth", "_route", "_type") + if callback: + decorator(callback) + return callback - def __init__(self) -> None: - self._auth = "" + return decorator - def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Any: - return self._callback(*args, **kwargs) + async def start(self, port: Optional[int] = None) -> None: + if self.running: + return - def type(self: T, type_: WebhookType) -> T: - """Sets the type of this endpoint. + port = port or self.__default_port + if port is None: + raise TypeError("Missing port.") - Args: - `type_` (:obj:`WebhookType`) - The type of the endpoint. + runner = web.AppRunner(self.__app) + await runner.setup() - Returns: - :obj:`WebhookEndpoint` - """ - self._type = type_ - return self + self.__server = web.TCPSite(runner, "0.0.0.0", port) + await self.__server.start() - def route(self: T, route_: str) -> T: - """ - Sets the route of this endpoint. + self.__running = True - Args: - `route_` (str) - The route of this endpoint. + async def close(self) -> None: + if not self.running: + return - Returns: - :obj:`WebhookEndpoint` - """ - self._route = route_ - return self + await self.__server.stop() + self.__running = False - def auth(self: T, auth_: str) -> T: - """ - Sets the auth of this endpoint. + @property + def running(self) -> bool: + return self.__running - Args: - `auth_` (str) - The auth of this endpoint. + @property + def app(self) -> web.Application: + return self.__app - Returns: - :obj:`WebhookEndpoint` - """ - self._auth = auth_ + async def __aenter__(self) -> "Webhooks": + await self.start() return self - @t.overload - def callback(self, callback_: None) -> t.Callable[[CallbackT], CallbackT]: - ... - - @t.overload - def callback(self: T, callback_: CallbackT) -> T: - ... - - def callback(self, callback_: t.Any = None) -> t.Any: - """ - Registers a vote callback, called whenever this endpoint receives POST requests. - - The callback can be either sync or async. - This method can be used as a decorator or a decorator factory. - - :Example: - .. code-block:: python - - import topgg - - webhook_manager = topgg.WebhookManager() - endpoint = ( - topgg.WebhookEndpoint() - .type(topgg.WebhookType.BOT) - .route("/dblwebhook") - .auth("youshallnotpass") - ) - - # The following are valid. - endpoint.callback(lambda vote_data: print("Receives a vote!", vote_data)) - - # Used as decorator, the decorated function will become the WebhookEndpoint object. - @endpoint.callback - def endpoint(vote_data: topgg.BotVoteData): - ... - - # Used as decorator factory, the decorated function will still be the function itself. - @endpoint.callback() - def on_vote(vote_data: topgg.BotVoteData): - ... - - webhook_manager.endpoint(endpoint) - """ - if callback_ is not None: - self._callback = callback_ - return self - - return self.callback - - -class BoundWebhookEndpoint(WebhookEndpoint): - """ - A WebhookEndpoint with a WebhookManager bound to it. - - You can instantiate this object using the :meth:`WebhookManager.endpoint` method. - - :Example: - .. code-block:: python - - import topgg - - webhook_manager = ( - topgg.WebhookManager() - .endpoint() - .type(topgg.WebhookType.BOT) - .route("/dblwebhook") - .auth("youshallnotpass") - ) - - # The following are valid. - endpoint.callback(lambda vote_data: print("Receives a vote!", vote_data)) - - # Used as decorator, the decorated function will become the BoundWebhookEndpoint object. - @endpoint.callback - def endpoint(vote_data: topgg.BotVoteData): - ... - - # Used as decorator factory, the decorated function will still be the function itself. - @endpoint.callback() - def on_vote(vote_data: topgg.BotVoteData): - ... - - endpoint.add_to_manager() - """ - - __slots__ = ("manager",) - - def __init__(self, manager: WebhookManager): - super().__init__() - self.manager = manager - - def add_to_manager(self) -> WebhookManager: - """ - Adds this endpoint to the webhook manager. - - Returns: - :obj:`WebhookManager` - - Raises: - :obj:`errors.TopGGException`: - If the object lacks attributes. - """ - self.manager.endpoint(self) - return self.manager - - -def endpoint( - route: str, type: WebhookType, auth: str = "" -) -> t.Callable[[t.Callable[..., t.Any]], WebhookEndpoint]: - """ - A decorator factory for instantiating WebhookEndpoint. - - Args: - route (str) - The route for the endpoint. - type (WebhookType) - The type of the endpoint. - auth (str) - The auth for the endpoint. - - Returns: - :obj:`typing.Callable` [[ :obj:`typing.Callable` [..., :obj:`typing.Any` ]], :obj:`WebhookEndpoint` ]: - The actual decorator. - - :Example: - .. code-block:: python - - import topgg - - @topgg.endpoint("/dblwebhook", WebhookType.BOT, "youshallnotpass") - async def on_vote( - vote_data: topgg.BotVoteData, - # database here is an injected data - database: Database = topgg.data(Database), - ): - ... - """ - - def decorator(func: t.Callable[..., t.Any]) -> WebhookEndpoint: - return WebhookEndpoint().route(route).type(type).auth(auth).callback(func) - - return decorator + async def __aexit__(self, *_: Any, **__: Any) -> None: + await self.close()