94 lines
3.3 KiB
Python
94 lines
3.3 KiB
Python
from datetime import datetime
|
|
from . import ValidationError
|
|
import hmac, os, sqlite3
|
|
|
|
DB_FILENAME = "/var/tmp/www/forms-sent-timestamps.db"
|
|
SECRET = b"Dieser Zeitstempel wurde von uns gepraegt. -- UB/fh 24.5.24"
|
|
TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
|
|
MIN_DURATION = 10 # Sekunden
|
|
|
|
REFUSE_NTH_TIMESTAMP_REUSE = 3 # Diese Werte werden nur bei der Erstellung einer
|
|
MAX_REQUESTS_PER_IP = 10 # neuen SQLite3-Datenbank berücksichtigt.
|
|
|
|
def digest_of(value):
|
|
return hmac.new(SECRET, value.encode("utf-8"), "sha256").hexdigest()
|
|
|
|
def signed_fresh_timestamp_check(remote_ip, error=ValidationError):
|
|
|
|
def render_timestamp(self, name):
|
|
value = datetime.now().strftime(TIME_FORMAT)
|
|
value = "{}/{}".format(value, digest_of(value))
|
|
return f'<input name="{name}" type="hidden" value="{value}">'
|
|
|
|
def rendervalue(self, name):
|
|
value = datetime.now().strftime(TIME_FORMAT)
|
|
return "{}/{}".format(value, digest_of(value))
|
|
|
|
def timestamp_checker(field, value, remote_ip=remote_ip):
|
|
if value is None: raise error("not passed")
|
|
value, digest = value.split("/")
|
|
if not hmac.compare_digest(digest, digest_of(value)):
|
|
raise error("invalid")
|
|
timediff = datetime.now() - datetime.strptime(value, TIME_FORMAT)
|
|
db = init_db()
|
|
try:
|
|
c = db.cursor()
|
|
c.execute("""
|
|
INSERT OR IGNORE into timestamps (timestamp, digest, remote_ip) values (?, ?, ?)
|
|
""", (value, digest, remote_ip));
|
|
c.execute("UPDATE timestamps SET used=used+1 WHERE timestamp=? AND remote_ip=?",
|
|
(value, remote_ip)
|
|
)
|
|
db.commit()
|
|
except sqlite3.IntegrityError:
|
|
raise error("overused")
|
|
else:
|
|
if c.rowcount == 0:
|
|
raise error("missing")
|
|
elif timediff.days > 0:
|
|
raise error("outdated")
|
|
elif timediff.seconds < MIN_DURATION:
|
|
raise error(f"fast_as_spambot")
|
|
finally:
|
|
db.close()
|
|
|
|
return True
|
|
|
|
return {
|
|
"validate": timestamp_checker,
|
|
"render": render_timestamp,
|
|
"rendervalue": rendervalue,
|
|
"required": True
|
|
}
|
|
|
|
|
|
def init_db(filename=DB_FILENAME):
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
|
conn = sqlite3.connect(filename)
|
|
if (not os.path.isfile(filename) or os.stat(filename).st_size == 0):
|
|
cursor = conn.cursor()
|
|
cursor.executescript(f"""
|
|
CREATE TABLE timestamps (
|
|
timestamp PRIMARY KEY,
|
|
digest,
|
|
remote_ip,
|
|
reqtime CURRENT_TIMESTAMP,
|
|
used DEFAULT 0 CHECK(used < {REFUSE_NTH_TIMESTAMP_REUSE})
|
|
);
|
|
CREATE TRIGGER block_hyperactive_ip
|
|
BEFORE INSERT ON timestamps
|
|
WHEN EXISTS (
|
|
SELECT 1
|
|
FROM timestamps
|
|
WHERE remote_ip=NEW.remote_ip
|
|
GROUP by remote_ip
|
|
HAVING count(timestamp)={MAX_REQUESTS_PER_IP}
|
|
)
|
|
BEGIN
|
|
SELECT RAISE(FAIL, "no more than {MAX_REQUESTS_PER_IP} requests per IP");
|
|
-- Still, cron'd housekeeping is required
|
|
END;
|
|
""")
|
|
conn.commit()
|
|
return conn
|