Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added python-sdk-master.zip
Binary file not shown.
127 changes: 40 additions & 87 deletions topgg/ratelimiter.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,63 @@
# -*- coding: utf-8 -*-

# The MIT License (MIT)

# Copyright (c) 2021 Assanali Mukhanov

# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

import asyncio
import collections
from datetime import datetime
from collections.abc import Iterable
from types import TracebackType
from typing import Any, Awaitable, Callable, List, Optional, Type
from collections import deque
from time import time
import asyncio
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Optional

# Shared reusable __aexit__ logic
async def shared_aexit(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
async with self.__lock:
self.__calls.append(time())
while self._timespan >= self.__period:
self.__calls.popleft()

class AsyncRateLimiter:
"""
Provides rate limiting for an operation with a configurable number of requests for a time period.
"""

__lock: asyncio.Lock
callback: Optional[Callable[[float], Awaitable[Any]]]
max_calls: int
period: float
calls: collections.deque
class Ratelimiter:
"""Handles ratelimits for a specific endpoint."""

def __init__(
self,
max_calls: int,
period: float = 1.0,
callback: Optional[Callable[[float], Awaitable[Any]]] = None,
):
if period <= 0:
raise ValueError("Rate limiting period should be > 0")
if max_calls <= 0:
raise ValueError("Rate limiting number of calls should be > 0")
self.calls = collections.deque()
__slots__ = ('__lock', '__max_calls', '__period', '__calls')

self.period = period
self.max_calls = max_calls
self.callback = callback
def __init__(self, max_calls: int, period: float = 1.0):
self.__calls = deque()
self.__period = period
self.__max_calls = max_calls
self.__lock = asyncio.Lock()

async def __aenter__(self) -> "AsyncRateLimiter":
async def __aenter__(self) -> 'Ratelimiter':
async with self.__lock:
if len(self.calls) >= self.max_calls:
until = datetime.utcnow().timestamp() + self.period - self._timespan
if self.callback:
asyncio.ensure_future(self.callback(until))
sleep_time = until - datetime.utcnow().timestamp()
if len(self.__calls) >= self.__max_calls:
until = time() + self.__period - self._timespan
sleep_time = until - time()
if sleep_time > 0:
await asyncio.sleep(sleep_time)
return self

async def __aexit__(
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
async with self.__lock:
# Store the last operation timestamp.
self.calls.append(datetime.utcnow().timestamp())
return self

while self._timespan >= self.period:
self.calls.popleft()
# Assign shared logic
__aexit__ = shared_aexit

@property
def _timespan(self) -> float:
return self.calls[-1] - self.calls[0]
return self.__calls[-1] - self.__calls[0] if len(self.__calls) >= 2 else 0.0


class Ratelimiters:
"""Handles ratelimits for multiple endpoints."""

class AsyncRateLimiterManager:
rate_limiters: List[AsyncRateLimiter]
__slots__ = ('__ratelimiters',)

def __init__(self, rate_limiters: List[AsyncRateLimiter]):
self.rate_limiters = rate_limiters
def __init__(self, ratelimiters: Iterable[Ratelimiter]):
self.__ratelimiters = tuple(ratelimiters)

async def __aenter__(self) -> "AsyncRateLimiterManager":
[await manager.__aenter__() for manager in self.rate_limiters]
async def __aenter__(self) -> 'Ratelimiters':
for ratelimiter in self.__ratelimiters:
await ratelimiter.__aenter__()
return self

async def __aexit__(
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.gather(
*[
manager.__aexit__(exc_type, exc_val, exc_tb)
for manager in self.rate_limiters
]
*(r.__aexit__(exc_type, exc_val, exc_tb) for r in self.__ratelimiters)
)
Loading
Loading