Compare commits

...

3 Commits

Author SHA1 Message Date
Florian Heß
dd05db67a6 Add substantial content to README.md 2025-05-15 21:05:28 +02:00
Florian Heß
dc90e61687 markdown filter for content block 2025-05-15 21:05:28 +02:00
Florian Heß
a5c74b0ba7 Added files. 2025-05-15 21:05:28 +02:00
23 changed files with 1469 additions and 2 deletions

View File

@ -1,3 +1,76 @@
# flask-website
Website auf Jinja2-Basis
=========================
Die Website auf Mason2-Basis (bzw. Mason1 auf der digi-Instanz) soll durch eine
Eigenimplementierung auf Jinja2 migriert werden.
Jinja2 läuft auf Django- oder auf Flask-Basis. Wir bevorzugen Flask, da wir
mutmaßlich keine Datenbank brauchen werden, worauf das mächtigere Django
stark ausgerichtet ist. Flask ist in der Beziehung leichtgewichtiger.
`flask`
-----------
Unser auf Jinja2-basierter Application Server mit flask.
`templates` und Layoutkaskaden
----------------------------------
`**/layout.jtml` heißen die kaskadenartig angewendeten Layoutdateien. Wird eine normale Seite mit der
Dateinamensendung `.html` aufgerufen, wird zunächst versucht, eine Datei namens `layout.jtml` aus dem
gleichen Verzeichnis zu laden. Schlägt dies fehl, wird die Suche auf der nächsthöheren Verzeichnisebene
fortgesetzt, und so weiter. Das gleiche geschieht, wenn für eine `layout.jtml` wiederum die nächsthöhere
gefunden werden soll, aber nur solange jeweils die erste Zeile darin lautet:
{% extends layout() %}
Anstelle von `layout()` kann auch der Dateipfad von einem beliebigen anderen Template stehen.
Dieses wird dann verwendet. Enthält es aber selbst wiederum `{% extends layout() %}`, wird zu derselben
Kaskade zurückgekehrt, als würde es in der eigentlich aufgerufenen Seite stehen.
Jedwedes direkt angegebene Template außer `layout()` schiebt sich gewissermaßen zwischen
die aufgerufene Seite und die weitere, normale Layoutkaskade.
`DSL.*`
-------
Jinja2-Migrationen der Mason-Snippets (`<& /nav1/img, name => "neu" &>`) sind aufrufbar wie folgt:
`{{ DSL.img(name="neu") }}`
Sprachinfix
-----------
Der Dateiendung der aufgerufenen Seite kann ein `de.`, `en.`, `fr.` oder `it.` vorangehen. Weitere
Sprachen können jederzeit hinzugefügt werden. Da es aber auch Top-Level-Verzeichnisse mit zwei
Buchstaben langen Namen geben kann (z.B. `nc/`), werden nur registrierte Sprachkürzel nach dieser
Logik berücksichtigt.
Erscheint in der URL als erstes Pfadsegment nach der Domain ein Sprachinfix (statt "en" geht
auch "Englisch/", "English", groß oder klein geschrieben), so wird bevorzugt die
entsprechende Fassung aufgerufen.
Alternativ, sollte die angefragte Fassung der Seite nicht gefunden werden, wird die englische bzw. die
deutsche Fassung genommen.
Aufrufe der Funktion `DSL.i18n(de="Text in deutscher Sprache", en="...", ...)` lösen sich entsprechend
der gefundenen Fassung auf, wenn der Textstring in der angefragten Sprache nicht verfügbar ist. Danach
wird die englische, dann die deutsche Fassung probiert.
Gleiches gilt für `DSL.i18n` im Filtermodus: `{% filter DSL.i18n %}...{% endfilter %}` kann immer dann
benutzt werden, wenn für unterschiedliche Sprachen verschiedene HTML-Markups verwendet werden sollen. Alle
von diesem Filter umfassten Inhalte werden gefiltert nach Elementen mit lang-Attribut. Nur diejenigen
dieser Elemente, die auf die angefragte bzw. gefundene Sprachfassung passen, werden samt Inhalt angezeigt.
Markdown
--------
Es wird ein Filter `markdown` definiert. In der Layout-Datei kann man diesen wie folgt nutzen:
{% filter markdown %}
{% block content %}Diese Seite hat keinen Inhalt, denn es ist nur die layout.jtml-Datei.{% endblock %}
{% endfilter %}
Application Gateway uwsgi, Application Webserver flask und Template Engine jinja2 konfiguriert und programmiert.

2
flask/.flaskenv Normal file
View File

@ -0,0 +1,2 @@
FLASK_APP=ubhd_flask
FLASK_ENV=development

11
flask/config.py Normal file
View File

@ -0,0 +1,11 @@
class Config:
DEBUG = False
TESTING = False
class ProductionConfig(Config):
SITE_URL = 'https://floritiv.de'
TEMPLATES_AUTO_RELOAD = True
class DevelopmentConfig(Config):
SITE_URL = 'https://floritiv.de'
DEBUG = True

View File

@ -0,0 +1,206 @@
import os, json, re
from glob import glob
from datetime import date
from jinja2 import pass_context
from markdown import Markdown
#from jinja2 import contextfilter
from flask import Flask, render_template, make_response, abort, redirect, send_from_directory, request
from .lib.dynamic_snippet_library import DynamicSnippetLibrary
TEMPLATES_DIR = '/web/htdocs'
def create_app(test_config=None):
app = Flask(
__name__,
instance_relative_config=True,
template_folder=TEMPLATES_DIR
)
if test_config is None:
if os.environ.get('FLASK_ENV') == 'development':
app.config.from_object("config.DevelopmentConfig")
else:
app.config.from_object("config.ProductionConfig")
else:
app.config.from_mapping(test_config)
try:
os.makedirs(app.instance_path)
except OSError:
pass
app.jinja_env.filters['DSL.i18n'] = locate_content
markdown = Markdown()
app.jinja_env.filters['markdown'] = lambda s: markdown.convert(s)
@app.route("/")
def home():
return redirect(app.config['SITE_URL'] + '/index.html', code=302)
@app.route("/<path:template>", methods=["GET","POST"])
def call_template(template):
if template.endswith('/Welcome.html'):
return redirect(app.config['SITE_URL'] + '/' + re.sub(r'[^/]+$', '', template), code=301)
m = re.search(r"([/.])[^/.]*$", template)
if m and m.group(1) == '/':
template = re.sub(r"/?$", "/index.jtml", template, count=1)
elif template.endswith('.html') and os.path.exists(
os.path.join(TEMPLATES_DIR, template)
):
pass # fall-through to upcoming send_from_directory()
else:
template = re.sub(r'h(?=tml$)', 'j', template)
if not template.endswith('.jtml'):
return send_from_directory(TEMPLATES_DIR, template)
m = re.match('([Ee]nglisc?h|[a-z]{2})/', template)
if m:
orig_lang = m.group(0)[:-1]
if 'nglis' in orig_lang:
# normalize
orig_lang = 'Englisch'
langcode = m.group(1)[:2].lower()
else:
langcode = ''
if langcode not in ('de', 'en', 'it', 'fr'):
orig_lang = ''
langcode = 'de'
elif m:
template = template.split('/', 1)[1]
layout = gather_layouts_in_ascending_directory_levels(template)
innermost_layout_name = 'layout.jtml'
next(layout)
def layout_finder(name=None):
nonlocal innermost_layout_name
if name is not None:
innermost_layout_name = name
return layout.send(innermost_layout_name)
template_lang, mtime_date, found_template = try_templates(template, langcode)
other_templates_rx = r'(?<=\.)\w\w(?=.jtml$)'
other_templates = re.sub(other_templates_rx, '*', found_template)
if '*' in other_templates:
other_templates = glob(os.path.join(TEMPLATES_DIR, other_templates))
available_la = []
for ot in other_templates:
if m := re.search(other_templates_rx, ot):
la = m.group(0)
else: continue
available_la.append(la)
if not('de' in available_la or 'en' in available_la):
raise RuntimeError(
"No other templates: Probed {}. de and en missing".
format(', '.join(available_la))
)
else:
available_la = None
def check_user_scope():
# find out if access comes from
# - Earth,
# - university campus (and if member or student)
# - or ourselves, Heidelberg University Library, IT dep.
ip_address = (
request.headers.getlist('X-Forwarded-For')[0]
or request.remote_addr
)
return ip_address
return render_template(found_template,
layout=layout_finder,
date=mtime_date,
DSL=DynamicSnippetLibrary(
root=app.config['SITE_URL'],
path=template.replace('.jtml', '.html'),
available_la=available_la,
orig_la=orig_lang,
requested_la=langcode,
found_la=template_lang,
passed_args=request.args,
passed_data=request.form
),
get_real_ip=check_user_scope,
)
@app.errorhandler(404)
def notfound_page(e):
return make_response(render_template(
'http404.jtml',
DSL=DynamicSnippetLibrary(
root=app.config['SITE_URL'],
path=request.path,
orig_la='',
requested_la='',
found_la='de',
)), 404
)
return app
def gather_layouts_in_ascending_directory_levels(template):
layout_name = yield
if template.endswith("/" + layout_name):
template, _ = os.path.split(template)
while True:
directory, _ = os.path.split(template)
if not len(directory.strip('/.')):
layout_name = yield layout_name
break
possible_template = os.path.join(directory, layout_name)
if os.path.exists(os.path.join(TEMPLATES_DIR, possible_template)):
layout_name = yield possible_template
template = directory
def try_templates(template, langcode):
tried = []
for lang in (langcode, 'en', 'de'):
suffixed = re.sub(r'(?=\.jtml$)', f'.{lang}', template)
path = os.path.join(TEMPLATES_DIR, suffixed)
if os.path.isfile(path):
template = suffixed
break
else:
tried.append(path)
else:
path = os.path.join(TEMPLATES_DIR, template)
lang = None
if not os.path.isfile(path):
abort(404)
last_update_date = date.fromtimestamp(os.path.getmtime(path)).strftime(
"%d.%m.%Y" )
return lang, last_update_date, template
@pass_context
#@contextfilter
def locate_content(ctx, content):
"""
Show elements with lang or xml:lang attributes, plus their contents,
only if the requested language matches.
Always show elements neither lang-specified nor contained in a
lang-specified ancestor element.
"""
lang_attr_value_rx = r'\blang=["\']?(\w\w)'
included = set((i, i) for i in re.findall(lang_attr_value_rx, content))
la = ctx['DSL'].i18n(**dict(included))
return re.sub(
r'<(\w+)[^>]+?' + lang_attr_value_rx + r'[^>]*>(.+?)</\1>',
lambda m: m.group(0) if m.group(2) == la else '',
content
)

Binary file not shown.

View File

@ -0,0 +1,132 @@
import json, re
from markupsafe import Markup
#from jinja2 import Markup
from werkzeug.exceptions import BadRequest
from importlib import import_module
class DynamicSnippetLibrary:
params_for = None
def __init__(self, root, path,
orig_la=None,
requested_la=None,
found_la=None,
available_la=None,
passed_args={},
passed_data={}
):
self.orig_la = orig_la
self.requested_la = requested_la
self.found_la = found_la
if available_la is None:
self.language_versions_available = []
else:
self.language_versions_available = available_la
self._root = root + '{LA}'
self.path = path
self.params = (passed_args, passed_data, {})
def change_root(self, new_root, strip=None):
m = re.match(r"\w+:", new_root)
mla = '{LA}' in new_root
if m and not mla:
new_root = new_root + '/{LA}'
if strip is None:
strip = new_root
if strip and self.path.startswith(strip):
self.path = self.path[ len(strip) : ]
self._root = new_root if m else self._root + '/' + new_root
return '<!-- Root: ' + self._root + '; orig_la=' + self.orig_la + ' -->'
def root(self, lang=None):
if lang is None:
lang = self.orig_la
if not lang:
return re.sub(r'\{LA\}/?', '', self._root)
else:
return self._root.replace('{LA}', lang)
def i18n(self, **langstr):
for lang in (self.requested_la, self.found_la, 'en', 'de'):
if lang in langstr:
return langstr[lang]
else:
return '???' + lang + '???'
from .snippets.marginews import marginews
from .snippets.img import img
from .snippets.email import email
from .snippets.up import up
from .snippets.snippet import snippet
# {% set ffg, success = DSL.get_form("mailform", ...) %}
# ...
# {{ ffg(name="subject") }}
def get_form(self, module, **initargs):
mod = import_module(re.sub(r"^\.*", "..forms.", module), __name__)
hp = mod.init(self.params[-1],initargs.pop("altfields",{}),initargs.pop("defaults",{}),**initargs)
is_sent = self.params[-1].pop('send',False)
if is_sent:
success = hp.validate_params(self.params[-1])
if success is True:
mod.process(hp)
error = False
else:
success = None
def form_field_generator(returntype, **kwargs):
# Parameterwert annehmen und validieren
field = hp.fields[kwargs["name"]]
if returntype == 'str':
return field.value
elif returntype == 'html':
if not field.is_valid:
error = field.error_msg
else:
error = False
return Markup(field.render(**kwargs) + (f"<span class='error'>{error}</span>" if error else ""))
# HTML-Feld darstellen.
# return MarkupSafe(...)
return form_field_generator, success
def virtuelleAusst(self): # Das ist doch ein Autohandler, oder?
pass
def flatten_meta(self, meta):
if isinstance(meta, dict):
metad = meta
else:
metad = json.loads(meta)
super_meta = metad.pop('', None)
if super_meta is None:
self.meta = metad
if not len(self.language_versions_available) \
and isinstance(metad['title'], dict):
self.language_versions_available = metad['title'].keys()
metad['title'] = self.i18n(**metad['title'])
return metad
super_meta.update(metad)
super_meta = self.flatten_meta(super_meta)
if 'allow_post_data' in super_meta:
for key in super_meta['allow_post_data']:
if key.endswith('+'):
getter = self.params[1].getlist
key = key[:-1]
else:
getter = self.params[1].get
if key in self.params[1]:
self.params[2][key] = getter(key)
if 'allow_url_params' in super_meta:
for key in super_meta['allow_url_params']:
if key.endswith('+'):
getter = self.params[0].getlist
key = key[:-1]
else:
getter = self.params[0].get
self.params[2].setdefault(key, getter(key))
return super_meta

1
flask/floritiweb/lib/forms/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__/

View File

@ -0,0 +1,215 @@
import re
from .field import Field, ValidationError
import sys
class HandleParams:
def __init__(
self, params={}, sequence=None, ignored_fields=None, defaults=None, altered_fields=None
):
self.sequence = list(sequence) if sequence else []
self.unrecognized = []
self.required_fields = set()
self.altered_fields = dict(altered_fields) if altered_fields else {}
self.defaults = dict(defaults) if defaults else {}
for key in self.defaults.keys():
if len(params.get(key,'') or '') > 0:
self.defaults[key] = params[key]
self.fields = {}
self.ignored_fields = set(ignored_fields or [])
def add_field(self, name, properties, **alter_cfg):
properties['name'] = name
variant = self.altered_fields.get(name)
if variant is not None:
if variant in alter_cfg:
cfg = alter_cfg[variant]
if not isinstance(cfg, dict):
raise RuntimeError(f'{name}: Not a dictionary for variant {variant}: {cfg}')
alter_cfg = dict(cfg)
else:
# A variant is requested that is not known / supported.
known_keys = ','.join(alter_cfg)
raise RuntimeError(f"Field-def {name}: No variant {variant} (keys: {known_keys})")
else:
alter_cfg = {}
for attr, acfg in alter_cfg.items() :
if attr.startswith('wrap_'):
attr = attr[5:]
wrap = True
else:
wrap = False
orig = properties.get(attr)
properties[attr] = lambda orig, *args, **kwargs: acfg(orig, *args, **kwargs) if wrap else acfg
if properties.pop('required', False) == True:
self.required_fields.add(name)
field = Field(**properties)
default = self.defaults.get(name)
if default is not None:
default = filter_value(field.apply_filters, name, default)
try:
if field.validate(default): field.value = default
except RuntimeError as e: # HandleParams.ValidationError raised spurious errors when
if len(default) > 0: # used flask is used with uwsgi
raise
if hasattr(field, 'digest_key') or field.multiple:
self.sequence.append(name)
self.fields[name] = field
def validate_params(self, params, set_invalid_value=True):
success = True
ignore = self.ignored_fields
validate_seq = [*self.sequence]
defer_validations = set(validate_seq)
tmp = {}
## raise RuntimeError(type(params))
for fname in self.required_fields:
if len(params.get(fname, '')) == 0:
params[fname] = None
fieldlist = open('/tmp/validated_fields.txt','w')
def paramsiter():
for fldname, fldvalue in params.items():
if isinstance(fldvalue, list):
for v in fldvalue:
yield fldname, v
else:
yield fldname, fldvalue
## raise RuntimeError(list(paramsiter()))
## raise RuntimeError(list(paramsiter()))
## raise RuntimeError(params.items())
## raise RuntimeError("defer_validations", list(defer_validations))
## if "foerderung" in defer_validations:
## raise RuntimeError("foerderung in defer_valitations.")
## else:
## raise RuntimeError("foerderung not in defer_validations.")
## for fldname, fldvalue in params.items():
## raise RuntimeError(params.items())
## if fldname == 'foerderung':
## raise RuntimeError("type", type(fldvalue), fldvalue)
## if fldname == 'foerderung' and len(fldvalue) > 1:
## raise RuntimeError(fldvalue)
for fldname, fldvalue in paramsiter():
## raise RuntimeError(fldname)
## if len(fldvalue) > 1:
## raise RuntimeError(fldvalue)
## if fldname == 'foerderung':
## raise RuntimeError(fldname + ' ' + fldvalue, type(fldvalue))
if fldvalue is not None and len(fldvalue) == 0:
continue
if ( fldname not in defer_validations
and fldname in self.fields
):
## raise RuntimeError(fldname)
## if fldname == 'foerderung':
## raise RuntimeError('test')
## raise RuntimeError(fldname + ' ' + fldvalue)
field = self.fields[fldname]
value = filter_value(field.apply_filters, fldname, fldvalue)
try:
this_success = field.validate(value)
## von debugging sessions
if fldname == 'dsgvochecked' and this_success is False:
## raise RuntimeError("this_success: " + this_success)
raise RuntimeError(value)
except HandleParams.ValidationError as e:
field.error_msg = str(e)
field.is_valid = False
print(fldname, file=fieldlist)
this_success = False
else:
field.is_valid = True
take_this_value = value is not None if set_invalid_value else this_success
if take_this_value:
field.value = value
if not(success and this_success):
success = False
elif not fldname in ignore:
#if hasattr(self.fields[fldname], 'digest_key'):
# raise RuntimeError(fldname)
#elif fldname.startswith("nachweise"):
# raise RuntimeError(fldname)
## raise RuntimeError("validate_seq: ", validate_seq)
for fname in validate_seq:
key_tmpl = {}
field = self.fields[fname]
if hasattr(field, 'digest_key'):
key_tmpl = field.digest_key(fldname)
if not key_tmpl: continue
value = filter_value(field.apply_filters, fname, fldvalue)
key_tmpl['value'] = value
value = key_tmpl
tmp.setdefault(fname, {})
slot = tmp[fname]
elif fldname == fname:
value = filter_value(field.apply_filters, fname, fldvalue)
slot = tmp
## raise RuntimeError("slot: ",slot)
else:
continue
if field.multiple:
slot.setdefault(fldname, [])
slot[fname].append(value)
## raise RuntimeError(fldname + ' ' + value)
else:
slot[fldname] = value
## raise RuntimeError(tmp)
fullfilled = False
for fldname, fldvalue in tmp.items():
field = self.fields[fldname]
try:
##raise RuntimeError("fldvalue2: ", fldvalue)
this_success = field.validate(fldvalue)
except HandleParams.ValidationError as e:
field.error_msg = str(e)
field.is_valid = False
this_success = False
else:
field.is_valid = True
take_this_value = fldvalue is not None if set_invalid_value else this_success
if take_this_value:
field.value = fldvalue
fullfilled = True
if not(success and this_success):
success = False
#if not fullfilled:
# self.unrecognized.append(fldname)
# continue
##raise RuntimeError(tmp)
return success
ADD_FILTERS = {}
def filter_value(filters_to_apply, name, value):
if isinstance(value, str):
value = re.sub(r'[\;\"\(\)\{\}\\\`\|\%\r\<\>\&\0]', '', value)
for fname in filters_to_apply:
if not fname in ADD_FILTERS:
raise RuntimeError(f"No filter {fname}")
if not isinstance(value, list):
value = [list]
for v in value:
try:
value = FILTERS[fname](v)
except HandleParams.ValidationError as e:
field.error_msg = str(e)
return
return value

View File

@ -0,0 +1,45 @@
CALLBACKS = {
'validate': lambda self, value: True,
'render': lambda self, *args, **kwargs: '',
'digest_key': None,
'rendervalue': lambda self, *args, **kwargs: self.value,
'initializer': None,
}
class Field:
def wrap_callback(self, cb):
if cb is None:
return
def func(*args, **kwargs):
return cb(self, *args, **kwargs)
return func
def __init__(self, *, is_multiple=False, **callbacks):
self.apply_filters = []
for cb in CALLBACKS.keys():
func = self.wrap_callback(
callbacks.get(cb, CALLBACKS[cb])
)
if func is not None:
setattr(self, cb, func)
self.initializer = callbacks.get('initializer', lambda: '')
#self.digest_key = callbacks.get('digest_key', None)
self.error_msg = ''
self.multiple = is_multiple
self.options_map = {}
self.is_valid = None
@property
def value(self):
if not hasattr(self, '_value'):
self._value = self.initializer()
return self._value
@value.setter
def value(self, value):
self._value = value
class ValidationError(RuntimeError):
pass

View File

@ -0,0 +1,93 @@
from datetime import datetime
from . import ValidationError
import hmac, os, sqlite3
DB_FILENAME = "/var/tmp/www/forms-sent-timestamps.db"
SECRET = b"Dieser Zeitstempel wurde von uns gepraegt. -- UB/fh 24.5.24"
TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
MIN_DURATION = 10 # Sekunden
REFUSE_NTH_TIMESTAMP_REUSE = 3 # Diese Werte werden nur bei der Erstellung einer
MAX_REQUESTS_PER_IP = 10 # neuen SQLite3-Datenbank berücksichtigt.
def digest_of(value):
return hmac.new(SECRET, value.encode("utf-8"), "sha256").hexdigest()
def signed_fresh_timestamp_check(remote_ip, error=ValidationError):
def render_timestamp(self, name):
value = datetime.now().strftime(TIME_FORMAT)
value = "{}/{}".format(value, digest_of(value))
return f'<input name="{name}" type="hidden" value="{value}">'
def rendervalue(self, name):
value = datetime.now().strftime(TIME_FORMAT)
return "{}/{}".format(value, digest_of(value))
def timestamp_checker(field, value, remote_ip=remote_ip):
if value is None: raise error("not passed")
value, digest = value.split("/")
if not hmac.compare_digest(digest, digest_of(value)):
raise error("invalid")
timediff = datetime.now() - datetime.strptime(value, TIME_FORMAT)
db = init_db()
try:
c = db.cursor()
c.execute("""
INSERT OR IGNORE into timestamps (timestamp, digest, remote_ip) values (?, ?, ?)
""", (value, digest, remote_ip));
c.execute("UPDATE timestamps SET used=used+1 WHERE timestamp=? AND remote_ip=?",
(value, remote_ip)
)
db.commit()
except sqlite3.IntegrityError:
raise error("overused")
else:
if c.rowcount == 0:
raise error("missing")
elif timediff.days > 0:
raise error("outdated")
elif timediff.seconds < MIN_DURATION:
raise error(f"fast_as_spambot")
finally:
db.close()
return True
return {
"validate": timestamp_checker,
"render": render_timestamp,
"rendervalue": rendervalue,
"required": True
}
def init_db(filename=DB_FILENAME):
os.makedirs(os.path.dirname(filename), exist_ok=True)
conn = sqlite3.connect(filename)
if (not os.path.isfile(filename) or os.stat(filename).st_size == 0):
cursor = conn.cursor()
cursor.executescript(f"""
CREATE TABLE timestamps (
timestamp PRIMARY KEY,
digest,
remote_ip,
reqtime CURRENT_TIMESTAMP,
used DEFAULT 0 CHECK(used < {REFUSE_NTH_TIMESTAMP_REUSE})
);
CREATE TRIGGER block_hyperactive_ip
BEFORE INSERT ON timestamps
WHEN EXISTS (
SELECT 1
FROM timestamps
WHERE remote_ip=NEW.remote_ip
GROUP by remote_ip
HAVING count(timestamp)={MAX_REQUESTS_PER_IP}
)
BEGIN
SELECT RAISE(FAIL, "no more than {MAX_REQUESTS_PER_IP} requests per IP");
-- Still, cron'd housekeeping is required
END;
""")
conn.commit()
return conn

View File

@ -0,0 +1,237 @@
from .handle_params import HandleParams, timewindowcheck
from email.message import EmailMessage
from subprocess import Popen, PIPE
import sys
import datetime
from textwrap import TextWrapper
from io import StringIO
import os
aa = open('/web/etc/allowed.adress')
addresses = {}
for line in aa:
short, longname, mailAddress = line.split(";")
addresses[short] = (longname, mailAddress.strip())
try:
aa.close()
except AttributeError:
pass
class ValidationError(RuntimeError):
def __init__(self, message):
self.langs = { 'de': message, 'en': message }
def __str__(self):
return self.langs['de']
HandleParams.ValidationError = ValidationError
class MultiLangExc(ValidationError):
def __init__(self, **langs):
self.langs = langs
timewindowcheck.init_db()
def init(params, altfields, defaults, check_remote_ip='0.0.0.0'):
hp = HandleParams(altered_fields=altfields, params=params, defaults=defaults)
def render_text(field, **args):
value = field.value
attr = attributes_to_string(args)
return f'<input style="width:75%;" type="text" value="{value}" {attr}>'
def render_textfield(field,**args):
value = field.value
attr = attributes_to_string(args)
return f'<textarea style="width: 75%; min-height: 12em;" {attr}>{value}</textarea>'
def render_checkbox(field, **args):
attr = attributes_to_string(args)
return f'<input type="checkbox" {attr} >'
mandatory_text = { 'de': "obligatorisch", 'en': "mandatory" }
def render_local_time(field,**args):
return str(datetime.datetime.now())
def must_be_nonempty(field, value):
if value is None or len(value) == 0:
raise MultiLangExc(**mandatory_text)
return True
def must_be_nonempty_then_validate(inner_val, field, value):
if len(value) == 0:
raise MultiLangExc(**mandatory_text)
return inner_val(self, value)
def validate_honeypot_field(field,value):
if len(value):
raise ValidationError(
"Sie haben ein verstecktes Feld ausgefüllt"
"(Honigtopf für Spambots)"
)
return True
def pass_longname(field, name):
value = field.value
try:
longname = addresses[value][0]
except KeyError:
longname = "Non-existent Enterprises Ltd (will forward)"
value = "ub"
return longname + f"<input type='hidden' name='adr' value='{value}'>"
def check_is_within_last24h():
pass
bei_allen_feldern_mgl = {
"mandatory": {
"required": True,
"wrap_validate": must_be_nonempty_then_validate,
}
}
hp.add_field("realname", {
"render":render_text
},
**bei_allen_feldern_mgl
)
hp.add_field("subject", {
"render":render_text,
"required": True,
})
hp.add_field("name", {
"validate": validate_honeypot_field,
"render":render_text
},
)
hp.add_field("body", {
"render":render_textfield,
"required": True,
})
hp.add_field("mailadr", {
"render":render_text
})
hp.add_field("wohnort", {
"render":render_text
})
hp.add_field("adr", {
"render":pass_longname
})
def twc_error(which):
msgs = {
"not passed": {
"de": "Spamprüfung gescheitert: kein timewindowcheck-Wert übergeben",
"en": "Spam detection failed: No field 'timewindowcheck' passed"
},
"invalid": { "de": "Zeitstempel und Digest ungültig", "en": "Timestamp and digest invalid" },
"missing": { "de": "Zeitstempel oder IP-Adresse nicht ermittelbar", "en": "Timestamp or IP address not indicated" },
"overused": { "de": "Der Zeitstempel wurde zu häufig verwendet", "en": "The timestamp has been used too often" },
"outdated": { "de": "Die Formularabfrage ist zu alt. Bitte laden Sie die Seite neu.", "en": "Form request is too old. Please reload the page." },
"fast_as_spambot": { "de": "Sie sind so schnell wie ein Spambot (oder schneller). Bitte lassen Sie sich vor dem Senden etwas mehr Zeit.", "en": "You are as fast as a spambot (or faster). Please have a breath or two before sending." },
}
return MultiLangExc(**msgs[which])
hp.add_field(
'timewindowcheck',
timewindowcheck.signed_fresh_timestamp_check(check_remote_ip, twc_error)
)
hp.add_field("dsgvochecked", {
"render":render_checkbox,
"rendervalue":render_local_time,
"required": True,
"validate": must_be_nonempty
})
return hp
def process(hp):
form = {
"adr": "adr",
"subject": "subject",
"realname": "realname",
"body": "body",
"ort": "wohnort",
"mymail": "mailadr",
"dsgvo_ts": "dsgvochecked",
}
for k,v in form.items():
form[k] = hp.fields[v].rendervalue() or hp.fields[v].value
adresse_formular = addresses[ hp.fields["adr"].value ][1]
# Mail bzw. Request rausschicken
send_mail(form, adresse_formular)
return form
def attributes_to_string(args):
attrs = []
for key, value in args.items():
attrs.append(f'{key}="{value}"')
return " ".join(attrs)
def send_mail(data, adr):
_w = TextWrapper(width=67, subsequent_indent=" "*13)
def wrap(label, slot):
_w.initial_indent = label+" "*(11-len(label))+': '
return _w.wrap(data[slot])
form = {
"Adressat": "adr",
"Betreff": "subject",
"Inhalt": "body",
"Absender": "realname",
"Wohnort": "ort",
"Antwort an": "mymail",
"DSE z.k.g.": "dsgvo_ts"
}
with open('/tmp/data_test.txt', 'w') as outfile:
outfile.write(str(data))
userinput = list()
for label,slot in form.items():
userinput.append('\n'.join(wrap(label,slot)))
text = ('' if data['mymail'] else """
-----------------------------------------------------------------
Bitte nicht die REPLY-Funktion verwenden. Es wurde keine Absende-
adresse angegeben.
Dieser Hinweis kann geloescht werden.
-----------------------------------------------------------------
""") + "\n".join(userinput)
mail = EmailMessage()
mail['From'] = 'WWW-Formular <noreply@example.invalid'
#mail['From'] = data['mymail'] or 'WWW-Formular'
mail['To'] = adr
del mail['Subject']
mail['Subject'] = data['subject']
mail.add_header('Content-Type', 'text/plain;charset=utf-8')
mail.set_payload(text)
mail_string = str(mail)
sml = Popen(['/usr/sbin/sendmail', '-t', '-oi'], stdin=PIPE)
#err = sml.communicate(mail.as_bytes(policy=mail.policy.clone(linesep='\r\n')))
with open('/var/tmp/mail_string_encode.txt', 'w') as file:
print(mail_string, file=file)
err = sml.communicate(mail_string.encode('utf8'))

View File

@ -0,0 +1,37 @@
from handle_params import HandleParams, ValidationError
import pytest, re
def regex_validator(regex, error_message):
def _validate(_, value):
if len(value) > 0:
m = re.search(regex, value)
else:
return
if m:
return value
else:
raise ValidationError(error_message)
return _validate
def test_hp_init():
hp = HandleParams()
def test_hp_with_text_field():
hp = HandleParams()
hp.add_field('name', {
'validate': regex_validator(
r'^([A-Z]\S+\.?) ([A-Z]\S+\.? )?([A-Z]\S+\.?)$',
'Does not look like a name'
)
})
hp.validate_params({
'name': 'Florian Heß'
})
assert hp.fields['name'].value == 'Florian Heß'
with pytest.raises(ValidationError):
assert hp.validate_params({ 'name': '123' })

View File

@ -0,0 +1 @@
forms/handle_params

View File

@ -0,0 +1,28 @@
Snippets
========
Hier enthalten sind die Python-Versionen der Snippets, die bei Mason unter /nav1/...
enthalten waren.
Alte Notation
-------------
<& /nav1/snippetname, arg1 => "value1", arg2 => "value2" &>
Neue Notation
-------------
{{ DSL.snippetname(arg1="value1", arg2="value2") }}
DSL steht für "DynamicSnippetLibrary" und enthält neben den Snippets, die jeweils
in ihren eigenen Dateien stehen (ggf. mit etwaigen Datenstrukturen, die statisch
oder dynamisch bei Serverstart befüllt werden), auch:
* `root`: Gibt die Seitenwurzel (URL) ggf. mit Sprachpräfix zurück. Eignet sich für Menüs.
* `i18n`: Argumente mit den Namen eines Sprachpräfix (de, en, etc.). In Abhängigkeit von der
aktuellen Sprache wird nur der entsprechende String angezeigt. Verwendet in Layouts, Snippets
und Seiten ohne Sprachpräfix, da sie wenig Texte enthalten.

View File

@ -0,0 +1,79 @@
from markupsafe import Markup
#from jinja2 import Markup
mail_registry = {}
def email( self,
an=None,
kuerzel=None,
name=None,
body=None,
subject=None,
title_include_email=False
):
k = an or kuerzel
if k not in mail_registry:
k = 'ub'
kuerzel = mail_registry[k]
realname = kuerzel[0]
if name == '{@}':
masked_addr = kuerzel[1].replace('@', """
<img src="/bilder/inOCRable_at-sign.png
title="Klammeraffe" style="height:1em;position:relative;top:2px;right:1px;"
alt="&#64;"
/>
""")
name = ( masked_addr[0] +
'<span class="unsichtbar" title="Hierüber sollen Adress-Sammelprogramme stolpern, '
'Sie stolpern hoffentlich nicht:">LÖSCHE-DIESE-SP@MVERHINDERUNG</span>'
+ masked_addr[1:]
)
if name is None:
name = realname
if title_include_email is True:
realname += f'&lt;{kuerzel[1]}&gt;'
return (
# Markup(f'<a href="{self.root()}kontakt/email.html?adr={k}')
Markup(f'<a href="mailto:{kuerzel[1]}"')
+ (Markup('&subject=') + subject if subject is not None else '')
+ (Markup("&body=") + body if body is not None else '')
+ Markup(f'" title="mit Ihrem E-Mail-Programm eine E-Mail an {realname} senden">{self.img(name="email")} {name}</a>')
)
with open("/web/etc/allowed.adress") as fh:
for line in fh:
line = line.rstrip("\r\n")
kuerzel, name, mail = line.split(";")
mail_registry[kuerzel] = (name, mail)
#
# open my $fh, '<', "/web/etc/allowed.adress" or die $!;
#
# while ( my $line = <$fh> ) {
# chomp $line;
# my ($kuerzel,$name,$mail) = split /;/, $line;
# $kuerzel{$kuerzel} = [$name, $mail];
# }
#
# </%class>
#
# <%perl>
# my $k = $.an || $.kuerzel;
# my $kuerzel = $kuerzel{$k} || $kuerzel{$k='ub'};
# my $realname = $kuerzel->[0];
# if ( ($.name//"") eq "{@}" ) {
# (my $masked_adr = $kuerzel->[1] )
# =~ s{@}{<img src="/bilder/inOCRable_at-sign.png" title="Klammeraffe" style="height:1em;position:relative;top:2px;right:1px;" alt="&#64;" />};
# substr($masked_adr, 1, 0, q{<span class="unsichtbar" title="Hierüber sollen Adress-Sammelprogramme stolpern, Sie stolpern hoffentlich nicht:">LÖSCHE-DIESE-SP@MVERHINDERUNG</span>});
# $.name($masked_adr);
# }
# my $name = $.name || $realname;
# $realname .= sprintf(' &lt;%s&gt;', $kuerzel->[1]) if $.title_include_email;
# </%perl>
#
# <a href="<% $m->page->root() %>/kontakt/email.html?adr=<% $k %>;subject=<% $.subject %>;body=<% $.body %>" title="zum Formular, um E-Mail an <% $realname %> zu senden"><& /nav1/img, name => 'email' &><% $name %></a>

View File

@ -0,0 +1,202 @@
from markupsafe import Markup
#from jinja2 import Markup
def img(self, name):
attrs = dict(IMG_INVENTORY[name])
for attr, value in attrs.items():
try:
attrs[attr] = self.i18n(**value)
except TypeError:
pass
attrs['src'] = attrs.pop('url')
attrs = ' '.join([ f'{a}="{v}"' for a, v in attrs.items() ])
return Markup(f'<img {attrs} />')
IMG_INVENTORY = {
'info': {
'url': "/nav1/grafik/all/info-button.gif",
'alt': "i",
'title': {
"de": "weitere Informationen",
"en": "further information",
}
},
'zurueck': {
'url': "/nav1/grafik/all/zurueck-button.gif",
'alt': { "de": "zur&uuml;ck", 'en': "back" },
'title': { "de": "eine Seite zur&uuml;ck", 'en': "back" },
},
'top': {
'url': "/nav1/grafik/all/top-button.gif",
'alt': { "de": "hoch", "en": "top of page" },
'title': { "de": "zum Seitenanfang", "en": "top of page" }
},
'uhr': {
'url': "/nav1/grafik/all/uhr-button.gif",
'alt': { "de": "Uhr", "en": "watch" },
'title': "Bearbeitungsdatum",
},
'achtung': {
'url': "/nav1/grafik/all/achtung-button.gif",
'alt': { "de": "!", "en": "important note" },
'title': { "de": "wichtiger Hinweis", "en": "important note" },
},
'neu': {
'url': {
"de": "/nav1/grafik/all/neu-button.gif",
"en": "/nav1/grafik/all/new-button.gif",
},
'alt': { "de": "Neu", "en": "new" },
'title': { "de": "neue Inhalte", "en": "new contents" },
},
'marker': {
'url': "/nav1/grafik/all/button-marker-15x15.gif",
'alt': { "de": "Hinweis", "en": "marker" },
'title': { "de": "siehe", "en": "listing" },
},
'marker_kl': {
'url': "/nav1/grafik/all/button-marker-13x13.gif",
'alt': { "de": "Hinweis", "en": "marker" },
'title': { "de": "siehe", "en": "listing" },
},
'frage': {
'url': "/nav1/grafik/all/frage-button.gif",
'alt': "?",
'title': { "de": "Hilfe", "en": "help" },
},
'fenster': {
'url': "/nav1/grafik/all/neuesfenster2.gif",
'alt': { "de": "in neuem Fenster", "en": "in a new window" },
'title': "in neuem Fenster",
},
'extern': {
'url': "/nav1/grafik/all/extern.png",
'alt': { "de": "externer Verweis", "en": "external link" },
'title': { "de": "externer Verweis", "en": "external link" },
},
'db_rot_i': {
'url': "/nav1/grafik/all/info-button.gif",
'alt': "i",
'title': {
"de": "eingeschr&auml;nkter Nutzerkreis, Information",
"en": "limited access, information",
}
},
'db_gruen_i': {
'url': "/nav1/grafik/all/info-button-gruen.gif",
'alt': "i",
'title': {
"de": "frei verf&uuml;gbar, Information",
"en": "free access, information",
}
},
'db_rot': {
'url': "/nav1/grafik/all/rot-button.gif",
'alt': {
"de": "roter Knopf",
"en": "red button",
},
'title': {
"de": "eingeschr&auml;nkter Nutzerkreis",
"en": "limited access",
},
},
'db_gruen': {
'url': "/nav1/grafik/all/gruen-button.gif",
'alt': {
"de": "gruener Knopf",
"en": "green button",
},
'title': {
"de": "frei verf&uuml;gbar",
"en": "free access",
},
},
'fahne_e': {
'url': "/nav1/grafik/all/uk.gif",
'alt': {
"de": "englisch",
"en": "english",
},
'title': {
"de": "englische Version",
"en": "english version",
},
},
'fahne_f': {
'url': "/nav1/grafik/all/france.gif",
'alt': {
"de": "franz&ouml;sisch",
"en": "french",
},
'title': {
"de": "franz&ouml,sische Version",
"en": "french version",
},
},
'fahne_d': {
'url': "/nav1/grafik/all/german.gif",
'alt': {
"de": "deutsch",
"en": "german",
},
'title': {
"de": "deutsche Version",
"en": "german version",
},
},
'fahne_i': {
'url': "/nav1/grafik/all/italy.gif",
'alt': { "de": "italienisch", "en": "italian" },
'title': {
"de": "italienische Version",
"en": "italian version"
},
},
'fahne_s': {
'url': "/nav1/grafik/all/spanien.gif",
'alt': { "de": "spanisch", "en": "spanish" },
'title': { "de": "spanische Version", "en": "spanish version" },
},
'marker_link': {
'url': "/nav1/grafik/all/markierung-button_kl.gif",
'alt': { "de": "Link", "en": "link" },
'title': {
"de": "Hinweis auf Link im Text",
"en": "reference",
}
},
'weiter': {
'url': "/nav1/grafik/all/weiter-button.gif",
'alt': { "de": "vor", "en": "forward" },
'title': {
"de": "zur n&auml;chsten Seite",
"en": "next page"
}
},
'euro': {
'url': "/nav1/grafik/all/euro.gif",
'alt': "Euro",
'title': {
"de": "Pay-per-Use-Angebot, Information",
"en": "per-per-use, information"
},
},
'email': {
'url': "/nav1/grafik/all/email_transp.gif",
'alt': { "de": "Brief", "en": "letter" },
'title': { "de": "E-Mail", "en": "mail" },
'align': "align='bottom'",
},
'oldbook': {
'url': "/nav1/grafik/all/oldbook.gif",
'alt': { "de": "Handschrift", "en": "manuscript" },
'title': { "de": "Handschrift", "en": "manuscript" },
'align': "align='middle'",
},
}

View File

@ -0,0 +1,48 @@
from markupsafe import Markup
#from jinja2 import Markup
def marginews(self, feed, heading, link, readMore, content_file, addLinks=None):
template = f"""
<div class="headlineRightColumn">
<a href="<% {feed} %>">
<img src="/bilder/rss.png" align="right" alt="RSS" />
</a>
<a href="{link}">{heading}</a>
</div>
<div class="boxRightColumn boxRightColumnStartseite">
"""
if isinstance(addLinks, list):
addLinks_str = '<div class="weiternews">'
for text, url in addLinks:
addLinks_str += f'<a href="{url}">{text}</a> &bull;'
addLinks_str += '</div>'
elif addLinks:
addLinks_str = ( '<!-- Error: addLinks is a ' + type(addLinks).__name__
+ ', needs to be list! -->' )
else:
addLinks_str = '<!-- No added links -->'
if readMore:
readMore = f"""
<div class="weiternews">
<a href="http://ub.blog.uni-heidelberg.de">
{self.i18n(de="Weiteres im UBlog", en="Earlier UBlog posts")}
&hellip;</a>
</div>
"""
else:
readMore = ''
readMore += '</div>'
try:
content_file = "/web/news/" + content_file + '.news'
with open(content_file, 'r') as content:
out_string = content.read()
except OSError as e:
return str(e)
else:
return Markup(template + out_string + addLinks_str + readMore)

View File

@ -0,0 +1,12 @@
import re
SNIPPETS = {}
with open('/web/etc/www-data/snippet.list') as snl:
for line in snl:
label, value = re.split(r'\s+', line, 1)
value = value.rstrip("\r\n")
SNIPPETS[label] = value
def snippet(self, name):
return SNIPPETS[name]

View File

@ -0,0 +1,7 @@
from markupsafe import Markup
#from jinja2 import Markup
def up(self):
return Markup("""
<a href="#up"><img style="display:block;float:right;width:39px;height:13px" alt="zum Seitenanfang" title="zum Seitenanfang" src="/nav4/grafik/layout/icon_top.gif"></a>
""")

View File

@ -0,0 +1 @@
echo "delete from timestamps where timestamp < datetime('now', '-1 DAY');" | sqlite3 /var/tmp/www/forms-sent-timestamps.db

11
flask/uwsgi.ini Normal file
View File

@ -0,0 +1,11 @@
[uwsgi]
socket = /opt/floritiv-web-site/floritiweb.sock
chmod-socket = 660
module = wsgi:app
processes = 5
uid = www-data
touch-reload = /opt/floritiv-web-site/.git/index
die-on-term=true
logto = /var/log/nginx/uwsgi/uwsgi-app.log
log-maxsize=100000000

6
flask/wsgi.py Normal file
View File

@ -0,0 +1,6 @@
from floritiweb import create_app
app = create_app()
if __name__ == '__main__':
app.run

20
requirements.txt Normal file
View File

@ -0,0 +1,20 @@
blinker==1.6.3
certifi==2024.2.2
charset-normalizer==3.3.2
click==8.1.7
defusedxml==0.7.1
deprecation==2.1.0
flask==3.0.0
idna==3.6
importlib-metadata==6.8.0
itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.3
packaging==24.0
python-dotenv==1.0.0
pyotrs==1.4.0
requests==2.31.0
urllib3==2.2.1
uWSGI==2.0.22
werkzeug==3.0.0
zipp==3.17.0