33 lines
1.1 KiB
Python
33 lines
1.1 KiB
Python
import time
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.responses import JSONResponse
|
|
|
|
LIMIT = 60
|
|
WINDOW = 60
|
|
BLOCK_TIME = 5*60
|
|
|
|
ip_store = {}
|
|
|
|
class RateLimitMiddleware(BaseHTTPMiddleware):
|
|
async def dispatch(self, request, call_next):
|
|
ip = request.client.host
|
|
now = time.time()
|
|
entry = ip_store.get(ip, {"count": 0, "first_request": now, "blocked_until": None})
|
|
|
|
if entry["blocked_until"] and now < entry["blocked_until"]:
|
|
return JSONResponse({"error": "Too many requests. Try again later."}, status_code=429)
|
|
|
|
if now - entry["first_request"] > WINDOW:
|
|
entry["count"] = 0
|
|
entry["first_request"] = now
|
|
|
|
entry["count"] += 1
|
|
|
|
if entry["count"] > LIMIT:
|
|
entry["blocked_until"] = now + BLOCK_TIME
|
|
ip_store[ip] = entry
|
|
return JSONResponse({"error": "Rate limit exceeded. Blocked for 5 minutes."}, status_code=429)
|
|
|
|
ip_store[ip] = entry
|
|
response = await call_next(request)
|
|
return response |