flask-website/flask/floritiweb/lib/forms/handle_params/timewindowcheck.py
2025-05-15 21:05:28 +02:00

94 lines
3.3 KiB
Python

from datetime import datetime
from . import ValidationError
import hmac, os, sqlite3
DB_FILENAME = "/var/tmp/www/forms-sent-timestamps.db"
SECRET = b"Dieser Zeitstempel wurde von uns gepraegt. -- UB/fh 24.5.24"
TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
MIN_DURATION = 10 # Sekunden
REFUSE_NTH_TIMESTAMP_REUSE = 3 # Diese Werte werden nur bei der Erstellung einer
MAX_REQUESTS_PER_IP = 10 # neuen SQLite3-Datenbank berücksichtigt.
def digest_of(value):
return hmac.new(SECRET, value.encode("utf-8"), "sha256").hexdigest()
def signed_fresh_timestamp_check(remote_ip, error=ValidationError):
def render_timestamp(self, name):
value = datetime.now().strftime(TIME_FORMAT)
value = "{}/{}".format(value, digest_of(value))
return f'<input name="{name}" type="hidden" value="{value}">'
def rendervalue(self, name):
value = datetime.now().strftime(TIME_FORMAT)
return "{}/{}".format(value, digest_of(value))
def timestamp_checker(field, value, remote_ip=remote_ip):
if value is None: raise error("not passed")
value, digest = value.split("/")
if not hmac.compare_digest(digest, digest_of(value)):
raise error("invalid")
timediff = datetime.now() - datetime.strptime(value, TIME_FORMAT)
db = init_db()
try:
c = db.cursor()
c.execute("""
INSERT OR IGNORE into timestamps (timestamp, digest, remote_ip) values (?, ?, ?)
""", (value, digest, remote_ip));
c.execute("UPDATE timestamps SET used=used+1 WHERE timestamp=? AND remote_ip=?",
(value, remote_ip)
)
db.commit()
except sqlite3.IntegrityError:
raise error("overused")
else:
if c.rowcount == 0:
raise error("missing")
elif timediff.days > 0:
raise error("outdated")
elif timediff.seconds < MIN_DURATION:
raise error(f"fast_as_spambot")
finally:
db.close()
return True
return {
"validate": timestamp_checker,
"render": render_timestamp,
"rendervalue": rendervalue,
"required": True
}
def init_db(filename=DB_FILENAME):
os.makedirs(os.path.dirname(filename), exist_ok=True)
conn = sqlite3.connect(filename)
if (not os.path.isfile(filename) or os.stat(filename).st_size == 0):
cursor = conn.cursor()
cursor.executescript(f"""
CREATE TABLE timestamps (
timestamp PRIMARY KEY,
digest,
remote_ip,
reqtime CURRENT_TIMESTAMP,
used DEFAULT 0 CHECK(used < {REFUSE_NTH_TIMESTAMP_REUSE})
);
CREATE TRIGGER block_hyperactive_ip
BEFORE INSERT ON timestamps
WHEN EXISTS (
SELECT 1
FROM timestamps
WHERE remote_ip=NEW.remote_ip
GROUP by remote_ip
HAVING count(timestamp)={MAX_REQUESTS_PER_IP}
)
BEGIN
SELECT RAISE(FAIL, "no more than {MAX_REQUESTS_PER_IP} requests per IP");
-- Still, cron'd housekeeping is required
END;
""")
conn.commit()
return conn