Added files.

This commit is contained in:
Florian Heß 2025-05-10 09:38:36 +02:00
parent a61c0ae1ff
commit a5c74b0ba7
22 changed files with 1390 additions and 0 deletions

2
flask/.flaskenv Normal file
View File

@ -0,0 +1,2 @@
FLASK_APP=ubhd_flask
FLASK_ENV=development

11
flask/config.py Normal file
View File

@ -0,0 +1,11 @@
class Config:
DEBUG = False
TESTING = False
class ProductionConfig(Config):
SITE_URL = 'https://floritiv.de'
TEMPLATES_AUTO_RELOAD = True
class DevelopmentConfig(Config):
SITE_URL = 'https://floritiv.de'
DEBUG = True

View File

@ -0,0 +1,202 @@
import os, json, re
from glob import glob
from datetime import date
from jinja2 import pass_context
#from jinja2 import contextfilter
from flask import Flask, render_template, make_response, abort, redirect, send_from_directory, request
from .lib.dynamic_snippet_library import DynamicSnippetLibrary
TEMPLATES_DIR = '/web/htdocs'
def create_app(test_config=None):
app = Flask(
__name__,
instance_relative_config=True,
template_folder=TEMPLATES_DIR
)
if test_config is None:
if os.environ.get('FLASK_ENV') == 'development':
app.config.from_object("config.DevelopmentConfig")
else:
app.config.from_object("config.ProductionConfig")
else:
app.config.from_mapping(test_config)
try:
os.makedirs(app.instance_path)
except OSError:
pass
app.jinja_env.filters['DSL.i18n'] = locate_content
@app.route("/")
def home():
return redirect(app.config['SITE_URL'] + '/index.html', code=302)
@app.route("/<path:template>", methods=["GET","POST"])
def call_template(template):
if template.endswith('/Welcome.html'):
return redirect(app.config['SITE_URL'] + '/' + re.sub(r'[^/]+$', '', template), code=301)
m = re.search(r"([/.])[^/.]*$", template)
if m and m.group(1) == '/':
template = re.sub(r"/?$", "/index.jtml", template, count=1)
elif template.endswith('.html') and os.path.exists(
os.path.join(TEMPLATES_DIR, template)
):
pass # fall-through to upcoming send_from_directory()
else:
template = re.sub(r'h(?=tml$)', 'j', template)
if not template.endswith('.jtml'):
return send_from_directory(TEMPLATES_DIR, template)
m = re.match('([Ee]nglisc?h|[a-z]{2})/', template)
if m:
orig_lang = m.group(0)[:-1]
if 'nglis' in orig_lang:
# normalize
orig_lang = 'Englisch'
langcode = m.group(1)[:2].lower()
else:
langcode = ''
if langcode not in ('de', 'en', 'it', 'fr'):
orig_lang = ''
langcode = 'de'
elif m:
template = template.split('/', 1)[1]
layout = gather_layouts_in_ascending_directory_levels(template)
innermost_layout_name = 'layout.jtml'
next(layout)
def layout_finder(name=None):
nonlocal innermost_layout_name
if name is not None:
innermost_layout_name = name
return layout.send(innermost_layout_name)
template_lang, mtime_date, found_template = try_templates(template, langcode)
other_templates_rx = r'(?<=\.)\w\w(?=.jtml$)'
other_templates = re.sub(other_templates_rx, '*', found_template)
if '*' in other_templates:
other_templates = glob(os.path.join(TEMPLATES_DIR, other_templates))
available_la = []
for ot in other_templates:
if m := re.search(other_templates_rx, ot):
la = m.group(0)
else: continue
available_la.append(la)
if not('de' in available_la or 'en' in available_la):
raise RuntimeError(
"No other templates: Probed {}. de and en missing".
format(', '.join(available_la))
)
else:
available_la = None
def check_user_scope():
# find out if access comes from
# - Earth,
# - university campus (and if member or student)
# - or ourselves, Heidelberg University Library, IT dep.
ip_address = (
request.headers.getlist('X-Forwarded-For')[0]
or request.remote_addr
)
return ip_address
return render_template(found_template,
layout=layout_finder,
date=mtime_date,
DSL=DynamicSnippetLibrary(
root=app.config['SITE_URL'],
path=template.replace('.jtml', '.html'),
available_la=available_la,
orig_la=orig_lang,
requested_la=langcode,
found_la=template_lang,
passed_args=request.args,
passed_data=request.form
),
get_real_ip=check_user_scope,
)
@app.errorhandler(404)
def notfound_page(e):
return make_response(render_template(
'http404.jtml',
DSL=DynamicSnippetLibrary(
root=app.config['SITE_URL'],
path=request.path,
orig_la='',
requested_la='',
found_la='de',
)), 404
)
return app
def gather_layouts_in_ascending_directory_levels(template):
layout_name = yield
if template.endswith("/" + layout_name):
template, _ = os.path.split(template)
while True:
directory, _ = os.path.split(template)
if not len(directory.strip('/.')):
layout_name = yield layout_name
break
possible_template = os.path.join(directory, layout_name)
if os.path.exists(os.path.join(TEMPLATES_DIR, possible_template)):
layout_name = yield possible_template
template = directory
def try_templates(template, langcode):
tried = []
for lang in (langcode, 'en', 'de'):
suffixed = re.sub(r'(?=\.jtml$)', f'.{lang}', template)
path = os.path.join(TEMPLATES_DIR, suffixed)
if os.path.isfile(path):
template = suffixed
break
else:
tried.append(path)
else:
path = os.path.join(TEMPLATES_DIR, template)
lang = None
if not os.path.isfile(path):
abort(404)
last_update_date = date.fromtimestamp(os.path.getmtime(path)).strftime(
"%d.%m.%Y" )
return lang, last_update_date, template
@pass_context
#@contextfilter
def locate_content(ctx, content):
"""
Show elements with lang or xml:lang attributes, plus their contents,
only if the requested language matches.
Always show elements neither lang-specified nor contained in a
lang-specified ancestor element.
"""
lang_attr_value_rx = r'\blang=["\']?(\w\w)'
included = set((i, i) for i in re.findall(lang_attr_value_rx, content))
la = ctx['DSL'].i18n(**dict(included))
return re.sub(
r'<(\w+)[^>]+?' + lang_attr_value_rx + r'[^>]*>(.+?)</\1>',
lambda m: m.group(0) if m.group(2) == la else '',
content
)

Binary file not shown.

View File

@ -0,0 +1,132 @@
import json, re
from markupsafe import Markup
#from jinja2 import Markup
from werkzeug.exceptions import BadRequest
from importlib import import_module
class DynamicSnippetLibrary:
params_for = None
def __init__(self, root, path,
orig_la=None,
requested_la=None,
found_la=None,
available_la=None,
passed_args={},
passed_data={}
):
self.orig_la = orig_la
self.requested_la = requested_la
self.found_la = found_la
if available_la is None:
self.language_versions_available = []
else:
self.language_versions_available = available_la
self._root = root + '{LA}'
self.path = path
self.params = (passed_args, passed_data, {})
def change_root(self, new_root, strip=None):
m = re.match(r"\w+:", new_root)
mla = '{LA}' in new_root
if m and not mla:
new_root = new_root + '/{LA}'
if strip is None:
strip = new_root
if strip and self.path.startswith(strip):
self.path = self.path[ len(strip) : ]
self._root = new_root if m else self._root + '/' + new_root
return '<!-- Root: ' + self._root + '; orig_la=' + self.orig_la + ' -->'
def root(self, lang=None):
if lang is None:
lang = self.orig_la
if not lang:
return re.sub(r'\{LA\}/?', '', self._root)
else:
return self._root.replace('{LA}', lang)
def i18n(self, **langstr):
for lang in (self.requested_la, self.found_la, 'en', 'de'):
if lang in langstr:
return langstr[lang]
else:
return '???' + lang + '???'
from .snippets.marginews import marginews
from .snippets.img import img
from .snippets.email import email
from .snippets.up import up
from .snippets.snippet import snippet
# {% set ffg, success = DSL.get_form("mailform", ...) %}
# ...
# {{ ffg(name="subject") }}
def get_form(self, module, **initargs):
mod = import_module(re.sub(r"^\.*", "..forms.", module), __name__)
hp = mod.init(self.params[-1],initargs.pop("altfields",{}),initargs.pop("defaults",{}),**initargs)
is_sent = self.params[-1].pop('send',False)
if is_sent:
success = hp.validate_params(self.params[-1])
if success is True:
mod.process(hp)
error = False
else:
success = None
def form_field_generator(returntype, **kwargs):
# Parameterwert annehmen und validieren
field = hp.fields[kwargs["name"]]
if returntype == 'str':
return field.value
elif returntype == 'html':
if not field.is_valid:
error = field.error_msg
else:
error = False
return Markup(field.render(**kwargs) + (f"<span class='error'>{error}</span>" if error else ""))
# HTML-Feld darstellen.
# return MarkupSafe(...)
return form_field_generator, success
def virtuelleAusst(self): # Das ist doch ein Autohandler, oder?
pass
def flatten_meta(self, meta):
if isinstance(meta, dict):
metad = meta
else:
metad = json.loads(meta)
super_meta = metad.pop('', None)
if super_meta is None:
self.meta = metad
if not len(self.language_versions_available) \
and isinstance(metad['title'], dict):
self.language_versions_available = metad['title'].keys()
metad['title'] = self.i18n(**metad['title'])
return metad
super_meta.update(metad)
super_meta = self.flatten_meta(super_meta)
if 'allow_post_data' in super_meta:
for key in super_meta['allow_post_data']:
if key.endswith('+'):
getter = self.params[1].getlist
key = key[:-1]
else:
getter = self.params[1].get
if key in self.params[1]:
self.params[2][key] = getter(key)
if 'allow_url_params' in super_meta:
for key in super_meta['allow_url_params']:
if key.endswith('+'):
getter = self.params[0].getlist
key = key[:-1]
else:
getter = self.params[0].get
self.params[2].setdefault(key, getter(key))
return super_meta

1
flask/floritiweb/lib/forms/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__/

View File

@ -0,0 +1,215 @@
import re
from .field import Field, ValidationError
import sys
class HandleParams:
def __init__(
self, params={}, sequence=None, ignored_fields=None, defaults=None, altered_fields=None
):
self.sequence = list(sequence) if sequence else []
self.unrecognized = []
self.required_fields = set()
self.altered_fields = dict(altered_fields) if altered_fields else {}
self.defaults = dict(defaults) if defaults else {}
for key in self.defaults.keys():
if len(params.get(key,'') or '') > 0:
self.defaults[key] = params[key]
self.fields = {}
self.ignored_fields = set(ignored_fields or [])
def add_field(self, name, properties, **alter_cfg):
properties['name'] = name
variant = self.altered_fields.get(name)
if variant is not None:
if variant in alter_cfg:
cfg = alter_cfg[variant]
if not isinstance(cfg, dict):
raise RuntimeError(f'{name}: Not a dictionary for variant {variant}: {cfg}')
alter_cfg = dict(cfg)
else:
# A variant is requested that is not known / supported.
known_keys = ','.join(alter_cfg)
raise RuntimeError(f"Field-def {name}: No variant {variant} (keys: {known_keys})")
else:
alter_cfg = {}
for attr, acfg in alter_cfg.items() :
if attr.startswith('wrap_'):
attr = attr[5:]
wrap = True
else:
wrap = False
orig = properties.get(attr)
properties[attr] = lambda orig, *args, **kwargs: acfg(orig, *args, **kwargs) if wrap else acfg
if properties.pop('required', False) == True:
self.required_fields.add(name)
field = Field(**properties)
default = self.defaults.get(name)
if default is not None:
default = filter_value(field.apply_filters, name, default)
try:
if field.validate(default): field.value = default
except RuntimeError as e: # HandleParams.ValidationError raised spurious errors when
if len(default) > 0: # used flask is used with uwsgi
raise
if hasattr(field, 'digest_key') or field.multiple:
self.sequence.append(name)
self.fields[name] = field
def validate_params(self, params, set_invalid_value=True):
success = True
ignore = self.ignored_fields
validate_seq = [*self.sequence]
defer_validations = set(validate_seq)
tmp = {}
## raise RuntimeError(type(params))
for fname in self.required_fields:
if len(params.get(fname, '')) == 0:
params[fname] = None
fieldlist = open('/tmp/validated_fields.txt','w')
def paramsiter():
for fldname, fldvalue in params.items():
if isinstance(fldvalue, list):
for v in fldvalue:
yield fldname, v
else:
yield fldname, fldvalue
## raise RuntimeError(list(paramsiter()))
## raise RuntimeError(list(paramsiter()))
## raise RuntimeError(params.items())
## raise RuntimeError("defer_validations", list(defer_validations))
## if "foerderung" in defer_validations:
## raise RuntimeError("foerderung in defer_valitations.")
## else:
## raise RuntimeError("foerderung not in defer_validations.")
## for fldname, fldvalue in params.items():
## raise RuntimeError(params.items())
## if fldname == 'foerderung':
## raise RuntimeError("type", type(fldvalue), fldvalue)
## if fldname == 'foerderung' and len(fldvalue) > 1:
## raise RuntimeError(fldvalue)
for fldname, fldvalue in paramsiter():
## raise RuntimeError(fldname)
## if len(fldvalue) > 1:
## raise RuntimeError(fldvalue)
## if fldname == 'foerderung':
## raise RuntimeError(fldname + ' ' + fldvalue, type(fldvalue))
if fldvalue is not None and len(fldvalue) == 0:
continue
if ( fldname not in defer_validations
and fldname in self.fields
):
## raise RuntimeError(fldname)
## if fldname == 'foerderung':
## raise RuntimeError('test')
## raise RuntimeError(fldname + ' ' + fldvalue)
field = self.fields[fldname]
value = filter_value(field.apply_filters, fldname, fldvalue)
try:
this_success = field.validate(value)
## von debugging sessions
if fldname == 'dsgvochecked' and this_success is False:
## raise RuntimeError("this_success: " + this_success)
raise RuntimeError(value)
except HandleParams.ValidationError as e:
field.error_msg = str(e)
field.is_valid = False
print(fldname, file=fieldlist)
this_success = False
else:
field.is_valid = True
take_this_value = value is not None if set_invalid_value else this_success
if take_this_value:
field.value = value
if not(success and this_success):
success = False
elif not fldname in ignore:
#if hasattr(self.fields[fldname], 'digest_key'):
# raise RuntimeError(fldname)
#elif fldname.startswith("nachweise"):
# raise RuntimeError(fldname)
## raise RuntimeError("validate_seq: ", validate_seq)
for fname in validate_seq:
key_tmpl = {}
field = self.fields[fname]
if hasattr(field, 'digest_key'):
key_tmpl = field.digest_key(fldname)
if not key_tmpl: continue
value = filter_value(field.apply_filters, fname, fldvalue)
key_tmpl['value'] = value
value = key_tmpl
tmp.setdefault(fname, {})
slot = tmp[fname]
elif fldname == fname:
value = filter_value(field.apply_filters, fname, fldvalue)
slot = tmp
## raise RuntimeError("slot: ",slot)
else:
continue
if field.multiple:
slot.setdefault(fldname, [])
slot[fname].append(value)
## raise RuntimeError(fldname + ' ' + value)
else:
slot[fldname] = value
## raise RuntimeError(tmp)
fullfilled = False
for fldname, fldvalue in tmp.items():
field = self.fields[fldname]
try:
##raise RuntimeError("fldvalue2: ", fldvalue)
this_success = field.validate(fldvalue)
except HandleParams.ValidationError as e:
field.error_msg = str(e)
field.is_valid = False
this_success = False
else:
field.is_valid = True
take_this_value = fldvalue is not None if set_invalid_value else this_success
if take_this_value:
field.value = fldvalue
fullfilled = True
if not(success and this_success):
success = False
#if not fullfilled:
# self.unrecognized.append(fldname)
# continue
##raise RuntimeError(tmp)
return success
ADD_FILTERS = {}
def filter_value(filters_to_apply, name, value):
if isinstance(value, str):
value = re.sub(r'[\;\"\(\)\{\}\\\`\|\%\r\<\>\&\0]', '', value)
for fname in filters_to_apply:
if not fname in ADD_FILTERS:
raise RuntimeError(f"No filter {fname}")
if not isinstance(value, list):
value = [list]
for v in value:
try:
value = FILTERS[fname](v)
except HandleParams.ValidationError as e:
field.error_msg = str(e)
return
return value

View File

@ -0,0 +1,45 @@
CALLBACKS = {
'validate': lambda self, value: True,
'render': lambda self, *args, **kwargs: '',
'digest_key': None,
'rendervalue': lambda self, *args, **kwargs: self.value,
'initializer': None,
}
class Field:
def wrap_callback(self, cb):
if cb is None:
return
def func(*args, **kwargs):
return cb(self, *args, **kwargs)
return func
def __init__(self, *, is_multiple=False, **callbacks):
self.apply_filters = []
for cb in CALLBACKS.keys():
func = self.wrap_callback(
callbacks.get(cb, CALLBACKS[cb])
)
if func is not None:
setattr(self, cb, func)
self.initializer = callbacks.get('initializer', lambda: '')
#self.digest_key = callbacks.get('digest_key', None)
self.error_msg = ''
self.multiple = is_multiple
self.options_map = {}
self.is_valid = None
@property
def value(self):
if not hasattr(self, '_value'):
self._value = self.initializer()
return self._value
@value.setter
def value(self, value):
self._value = value
class ValidationError(RuntimeError):
pass

View File

@ -0,0 +1,93 @@
from datetime import datetime
from . import ValidationError
import hmac, os, sqlite3
DB_FILENAME = "/var/tmp/www/forms-sent-timestamps.db"
SECRET = b"Dieser Zeitstempel wurde von uns gepraegt. -- UB/fh 24.5.24"
TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
MIN_DURATION = 10 # Sekunden
REFUSE_NTH_TIMESTAMP_REUSE = 3 # Diese Werte werden nur bei der Erstellung einer
MAX_REQUESTS_PER_IP = 10 # neuen SQLite3-Datenbank berücksichtigt.
def digest_of(value):
return hmac.new(SECRET, value.encode("utf-8"), "sha256").hexdigest()
def signed_fresh_timestamp_check(remote_ip, error=ValidationError):
def render_timestamp(self, name):
value = datetime.now().strftime(TIME_FORMAT)
value = "{}/{}".format(value, digest_of(value))
return f'<input name="{name}" type="hidden" value="{value}">'
def rendervalue(self, name):
value = datetime.now().strftime(TIME_FORMAT)
return "{}/{}".format(value, digest_of(value))
def timestamp_checker(field, value, remote_ip=remote_ip):
if value is None: raise error("not passed")
value, digest = value.split("/")
if not hmac.compare_digest(digest, digest_of(value)):
raise error("invalid")
timediff = datetime.now() - datetime.strptime(value, TIME_FORMAT)
db = init_db()
try:
c = db.cursor()
c.execute("""
INSERT OR IGNORE into timestamps (timestamp, digest, remote_ip) values (?, ?, ?)
""", (value, digest, remote_ip));
c.execute("UPDATE timestamps SET used=used+1 WHERE timestamp=? AND remote_ip=?",
(value, remote_ip)
)
db.commit()
except sqlite3.IntegrityError:
raise error("overused")
else:
if c.rowcount == 0:
raise error("missing")
elif timediff.days > 0:
raise error("outdated")
elif timediff.seconds < MIN_DURATION:
raise error(f"fast_as_spambot")
finally:
db.close()
return True
return {
"validate": timestamp_checker,
"render": render_timestamp,
"rendervalue": rendervalue,
"required": True
}
def init_db(filename=DB_FILENAME):
os.makedirs(os.path.dirname(filename), exist_ok=True)
conn = sqlite3.connect(filename)
if (not os.path.isfile(filename) or os.stat(filename).st_size == 0):
cursor = conn.cursor()
cursor.executescript(f"""
CREATE TABLE timestamps (
timestamp PRIMARY KEY,
digest,
remote_ip,
reqtime CURRENT_TIMESTAMP,
used DEFAULT 0 CHECK(used < {REFUSE_NTH_TIMESTAMP_REUSE})
);
CREATE TRIGGER block_hyperactive_ip
BEFORE INSERT ON timestamps
WHEN EXISTS (
SELECT 1
FROM timestamps
WHERE remote_ip=NEW.remote_ip
GROUP by remote_ip
HAVING count(timestamp)={MAX_REQUESTS_PER_IP}
)
BEGIN
SELECT RAISE(FAIL, "no more than {MAX_REQUESTS_PER_IP} requests per IP");
-- Still, cron'd housekeeping is required
END;
""")
conn.commit()
return conn

View File

@ -0,0 +1,237 @@
from .handle_params import HandleParams, timewindowcheck
from email.message import EmailMessage
from subprocess import Popen, PIPE
import sys
import datetime
from textwrap import TextWrapper
from io import StringIO
import os
aa = open('/web/etc/allowed.adress')
addresses = {}
for line in aa:
short, longname, mailAddress = line.split(";")
addresses[short] = (longname, mailAddress.strip())
try:
aa.close()
except AttributeError:
pass
class ValidationError(RuntimeError):
def __init__(self, message):
self.langs = { 'de': message, 'en': message }
def __str__(self):
return self.langs['de']
HandleParams.ValidationError = ValidationError
class MultiLangExc(ValidationError):
def __init__(self, **langs):
self.langs = langs
timewindowcheck.init_db()
def init(params, altfields, defaults, check_remote_ip='0.0.0.0'):
hp = HandleParams(altered_fields=altfields, params=params, defaults=defaults)
def render_text(field, **args):
value = field.value
attr = attributes_to_string(args)
return f'<input style="width:75%;" type="text" value="{value}" {attr}>'
def render_textfield(field,**args):
value = field.value
attr = attributes_to_string(args)
return f'<textarea style="width: 75%; min-height: 12em;" {attr}>{value}</textarea>'
def render_checkbox(field, **args):
attr = attributes_to_string(args)
return f'<input type="checkbox" {attr} >'
mandatory_text = { 'de': "obligatorisch", 'en': "mandatory" }
def render_local_time(field,**args):
return str(datetime.datetime.now())
def must_be_nonempty(field, value):
if value is None or len(value) == 0:
raise MultiLangExc(**mandatory_text)
return True
def must_be_nonempty_then_validate(inner_val, field, value):
if len(value) == 0:
raise MultiLangExc(**mandatory_text)
return inner_val(self, value)
def validate_honeypot_field(field,value):
if len(value):
raise ValidationError(
"Sie haben ein verstecktes Feld ausgefüllt"
"(Honigtopf für Spambots)"
)
return True
def pass_longname(field, name):
value = field.value
try:
longname = addresses[value][0]
except KeyError:
longname = "Non-existent Enterprises Ltd (will forward)"
value = "ub"
return longname + f"<input type='hidden' name='adr' value='{value}'>"
def check_is_within_last24h():
pass
bei_allen_feldern_mgl = {
"mandatory": {
"required": True,
"wrap_validate": must_be_nonempty_then_validate,
}
}
hp.add_field("realname", {
"render":render_text
},
**bei_allen_feldern_mgl
)
hp.add_field("subject", {
"render":render_text,
"required": True,
})
hp.add_field("name", {
"validate": validate_honeypot_field,
"render":render_text
},
)
hp.add_field("body", {
"render":render_textfield,
"required": True,
})
hp.add_field("mailadr", {
"render":render_text
})
hp.add_field("wohnort", {
"render":render_text
})
hp.add_field("adr", {
"render":pass_longname
})
def twc_error(which):
msgs = {
"not passed": {
"de": "Spamprüfung gescheitert: kein timewindowcheck-Wert übergeben",
"en": "Spam detection failed: No field 'timewindowcheck' passed"
},
"invalid": { "de": "Zeitstempel und Digest ungültig", "en": "Timestamp and digest invalid" },
"missing": { "de": "Zeitstempel oder IP-Adresse nicht ermittelbar", "en": "Timestamp or IP address not indicated" },
"overused": { "de": "Der Zeitstempel wurde zu häufig verwendet", "en": "The timestamp has been used too often" },
"outdated": { "de": "Die Formularabfrage ist zu alt. Bitte laden Sie die Seite neu.", "en": "Form request is too old. Please reload the page." },
"fast_as_spambot": { "de": "Sie sind so schnell wie ein Spambot (oder schneller). Bitte lassen Sie sich vor dem Senden etwas mehr Zeit.", "en": "You are as fast as a spambot (or faster). Please have a breath or two before sending." },
}
return MultiLangExc(**msgs[which])
hp.add_field(
'timewindowcheck',
timewindowcheck.signed_fresh_timestamp_check(check_remote_ip, twc_error)
)
hp.add_field("dsgvochecked", {
"render":render_checkbox,
"rendervalue":render_local_time,
"required": True,
"validate": must_be_nonempty
})
return hp
def process(hp):
form = {
"adr": "adr",
"subject": "subject",
"realname": "realname",
"body": "body",
"ort": "wohnort",
"mymail": "mailadr",
"dsgvo_ts": "dsgvochecked",
}
for k,v in form.items():
form[k] = hp.fields[v].rendervalue() or hp.fields[v].value
adresse_formular = addresses[ hp.fields["adr"].value ][1]
# Mail bzw. Request rausschicken
send_mail(form, adresse_formular)
return form
def attributes_to_string(args):
attrs = []
for key, value in args.items():
attrs.append(f'{key}="{value}"')
return " ".join(attrs)
def send_mail(data, adr):
_w = TextWrapper(width=67, subsequent_indent=" "*13)
def wrap(label, slot):
_w.initial_indent = label+" "*(11-len(label))+': '
return _w.wrap(data[slot])
form = {
"Adressat": "adr",
"Betreff": "subject",
"Inhalt": "body",
"Absender": "realname",
"Wohnort": "ort",
"Antwort an": "mymail",
"DSE z.k.g.": "dsgvo_ts"
}
with open('/tmp/data_test.txt', 'w') as outfile:
outfile.write(str(data))
userinput = list()
for label,slot in form.items():
userinput.append('\n'.join(wrap(label,slot)))
text = ('' if data['mymail'] else """
-----------------------------------------------------------------
Bitte nicht die REPLY-Funktion verwenden. Es wurde keine Absende-
adresse angegeben.
Dieser Hinweis kann geloescht werden.
-----------------------------------------------------------------
""") + "\n".join(userinput)
mail = EmailMessage()
mail['From'] = 'WWW-Formular <noreply@example.invalid'
#mail['From'] = data['mymail'] or 'WWW-Formular'
mail['To'] = adr
del mail['Subject']
mail['Subject'] = data['subject']
mail.add_header('Content-Type', 'text/plain;charset=utf-8')
mail.set_payload(text)
mail_string = str(mail)
sml = Popen(['/usr/sbin/sendmail', '-t', '-oi'], stdin=PIPE)
#err = sml.communicate(mail.as_bytes(policy=mail.policy.clone(linesep='\r\n')))
with open('/var/tmp/mail_string_encode.txt', 'w') as file:
print(mail_string, file=file)
err = sml.communicate(mail_string.encode('utf8'))

View File

@ -0,0 +1,37 @@
from handle_params import HandleParams, ValidationError
import pytest, re
def regex_validator(regex, error_message):
def _validate(_, value):
if len(value) > 0:
m = re.search(regex, value)
else:
return
if m:
return value
else:
raise ValidationError(error_message)
return _validate
def test_hp_init():
hp = HandleParams()
def test_hp_with_text_field():
hp = HandleParams()
hp.add_field('name', {
'validate': regex_validator(
r'^([A-Z]\S+\.?) ([A-Z]\S+\.? )?([A-Z]\S+\.?)$',
'Does not look like a name'
)
})
hp.validate_params({
'name': 'Florian Heß'
})
assert hp.fields['name'].value == 'Florian Heß'
with pytest.raises(ValidationError):
assert hp.validate_params({ 'name': '123' })

View File

@ -0,0 +1 @@
forms/handle_params

View File

@ -0,0 +1,28 @@
Snippets
========
Hier enthalten sind die Python-Versionen der Snippets, die bei Mason unter /nav1/...
enthalten waren.
Alte Notation
-------------
<& /nav1/snippetname, arg1 => "value1", arg2 => "value2" &>
Neue Notation
-------------
{{ DSL.snippetname(arg1="value1", arg2="value2") }}
DSL steht für "DynamicSnippetLibrary" und enthält neben den Snippets, die jeweils
in ihren eigenen Dateien stehen (ggf. mit etwaigen Datenstrukturen, die statisch
oder dynamisch bei Serverstart befüllt werden), auch:
* `root`: Gibt die Seitenwurzel (URL) ggf. mit Sprachpräfix zurück. Eignet sich für Menüs.
* `i18n`: Argumente mit den Namen eines Sprachpräfix (de, en, etc.). In Abhängigkeit von der
aktuellen Sprache wird nur der entsprechende String angezeigt. Verwendet in Layouts, Snippets
und Seiten ohne Sprachpräfix, da sie wenig Texte enthalten.

View File

@ -0,0 +1,79 @@
from markupsafe import Markup
#from jinja2 import Markup
mail_registry = {}
def email( self,
an=None,
kuerzel=None,
name=None,
body=None,
subject=None,
title_include_email=False
):
k = an or kuerzel
if k not in mail_registry:
k = 'ub'
kuerzel = mail_registry[k]
realname = kuerzel[0]
if name == '{@}':
masked_addr = kuerzel[1].replace('@', """
<img src="/bilder/inOCRable_at-sign.png
title="Klammeraffe" style="height:1em;position:relative;top:2px;right:1px;"
alt="&#64;"
/>
""")
name = ( masked_addr[0] +
'<span class="unsichtbar" title="Hierüber sollen Adress-Sammelprogramme stolpern, '
'Sie stolpern hoffentlich nicht:">LÖSCHE-DIESE-SP@MVERHINDERUNG</span>'
+ masked_addr[1:]
)
if name is None:
name = realname
if title_include_email is True:
realname += f'&lt;{kuerzel[1]}&gt;'
return (
# Markup(f'<a href="{self.root()}kontakt/email.html?adr={k}')
Markup(f'<a href="mailto:{kuerzel[1]}"')
+ (Markup('&subject=') + subject if subject is not None else '')
+ (Markup("&body=") + body if body is not None else '')
+ Markup(f'" title="mit Ihrem E-Mail-Programm eine E-Mail an {realname} senden">{self.img(name="email")} {name}</a>')
)
with open("/web/etc/allowed.adress") as fh:
for line in fh:
line = line.rstrip("\r\n")
kuerzel, name, mail = line.split(";")
mail_registry[kuerzel] = (name, mail)
#
# open my $fh, '<', "/web/etc/allowed.adress" or die $!;
#
# while ( my $line = <$fh> ) {
# chomp $line;
# my ($kuerzel,$name,$mail) = split /;/, $line;
# $kuerzel{$kuerzel} = [$name, $mail];
# }
#
# </%class>
#
# <%perl>
# my $k = $.an || $.kuerzel;
# my $kuerzel = $kuerzel{$k} || $kuerzel{$k='ub'};
# my $realname = $kuerzel->[0];
# if ( ($.name//"") eq "{@}" ) {
# (my $masked_adr = $kuerzel->[1] )
# =~ s{@}{<img src="/bilder/inOCRable_at-sign.png" title="Klammeraffe" style="height:1em;position:relative;top:2px;right:1px;" alt="&#64;" />};
# substr($masked_adr, 1, 0, q{<span class="unsichtbar" title="Hierüber sollen Adress-Sammelprogramme stolpern, Sie stolpern hoffentlich nicht:">LÖSCHE-DIESE-SP@MVERHINDERUNG</span>});
# $.name($masked_adr);
# }
# my $name = $.name || $realname;
# $realname .= sprintf(' &lt;%s&gt;', $kuerzel->[1]) if $.title_include_email;
# </%perl>
#
# <a href="<% $m->page->root() %>/kontakt/email.html?adr=<% $k %>;subject=<% $.subject %>;body=<% $.body %>" title="zum Formular, um E-Mail an <% $realname %> zu senden"><& /nav1/img, name => 'email' &><% $name %></a>

View File

@ -0,0 +1,202 @@
from markupsafe import Markup
#from jinja2 import Markup
def img(self, name):
attrs = dict(IMG_INVENTORY[name])
for attr, value in attrs.items():
try:
attrs[attr] = self.i18n(**value)
except TypeError:
pass
attrs['src'] = attrs.pop('url')
attrs = ' '.join([ f'{a}="{v}"' for a, v in attrs.items() ])
return Markup(f'<img {attrs} />')
IMG_INVENTORY = {
'info': {
'url': "/nav1/grafik/all/info-button.gif",
'alt': "i",
'title': {
"de": "weitere Informationen",
"en": "further information",
}
},
'zurueck': {
'url': "/nav1/grafik/all/zurueck-button.gif",
'alt': { "de": "zur&uuml;ck", 'en': "back" },
'title': { "de": "eine Seite zur&uuml;ck", 'en': "back" },
},
'top': {
'url': "/nav1/grafik/all/top-button.gif",
'alt': { "de": "hoch", "en": "top of page" },
'title': { "de": "zum Seitenanfang", "en": "top of page" }
},
'uhr': {
'url': "/nav1/grafik/all/uhr-button.gif",
'alt': { "de": "Uhr", "en": "watch" },
'title': "Bearbeitungsdatum",
},
'achtung': {
'url': "/nav1/grafik/all/achtung-button.gif",
'alt': { "de": "!", "en": "important note" },
'title': { "de": "wichtiger Hinweis", "en": "important note" },
},
'neu': {
'url': {
"de": "/nav1/grafik/all/neu-button.gif",
"en": "/nav1/grafik/all/new-button.gif",
},
'alt': { "de": "Neu", "en": "new" },
'title': { "de": "neue Inhalte", "en": "new contents" },
},
'marker': {
'url': "/nav1/grafik/all/button-marker-15x15.gif",
'alt': { "de": "Hinweis", "en": "marker" },
'title': { "de": "siehe", "en": "listing" },
},
'marker_kl': {
'url': "/nav1/grafik/all/button-marker-13x13.gif",
'alt': { "de": "Hinweis", "en": "marker" },
'title': { "de": "siehe", "en": "listing" },
},
'frage': {
'url': "/nav1/grafik/all/frage-button.gif",
'alt': "?",
'title': { "de": "Hilfe", "en": "help" },
},
'fenster': {
'url': "/nav1/grafik/all/neuesfenster2.gif",
'alt': { "de": "in neuem Fenster", "en": "in a new window" },
'title': "in neuem Fenster",
},
'extern': {
'url': "/nav1/grafik/all/extern.png",
'alt': { "de": "externer Verweis", "en": "external link" },
'title': { "de": "externer Verweis", "en": "external link" },
},
'db_rot_i': {
'url': "/nav1/grafik/all/info-button.gif",
'alt': "i",
'title': {
"de": "eingeschr&auml;nkter Nutzerkreis, Information",
"en": "limited access, information",
}
},
'db_gruen_i': {
'url': "/nav1/grafik/all/info-button-gruen.gif",
'alt': "i",
'title': {
"de": "frei verf&uuml;gbar, Information",
"en": "free access, information",
}
},
'db_rot': {
'url': "/nav1/grafik/all/rot-button.gif",
'alt': {
"de": "roter Knopf",
"en": "red button",
},
'title': {
"de": "eingeschr&auml;nkter Nutzerkreis",
"en": "limited access",
},
},
'db_gruen': {
'url': "/nav1/grafik/all/gruen-button.gif",
'alt': {
"de": "gruener Knopf",
"en": "green button",
},
'title': {
"de": "frei verf&uuml;gbar",
"en": "free access",
},
},
'fahne_e': {
'url': "/nav1/grafik/all/uk.gif",
'alt': {
"de": "englisch",
"en": "english",
},
'title': {
"de": "englische Version",
"en": "english version",
},
},
'fahne_f': {
'url': "/nav1/grafik/all/france.gif",
'alt': {
"de": "franz&ouml;sisch",
"en": "french",
},
'title': {
"de": "franz&ouml,sische Version",
"en": "french version",
},
},
'fahne_d': {
'url': "/nav1/grafik/all/german.gif",
'alt': {
"de": "deutsch",
"en": "german",
},
'title': {
"de": "deutsche Version",
"en": "german version",
},
},
'fahne_i': {
'url': "/nav1/grafik/all/italy.gif",
'alt': { "de": "italienisch", "en": "italian" },
'title': {
"de": "italienische Version",
"en": "italian version"
},
},
'fahne_s': {
'url': "/nav1/grafik/all/spanien.gif",
'alt': { "de": "spanisch", "en": "spanish" },
'title': { "de": "spanische Version", "en": "spanish version" },
},
'marker_link': {
'url': "/nav1/grafik/all/markierung-button_kl.gif",
'alt': { "de": "Link", "en": "link" },
'title': {
"de": "Hinweis auf Link im Text",
"en": "reference",
}
},
'weiter': {
'url': "/nav1/grafik/all/weiter-button.gif",
'alt': { "de": "vor", "en": "forward" },
'title': {
"de": "zur n&auml;chsten Seite",
"en": "next page"
}
},
'euro': {
'url': "/nav1/grafik/all/euro.gif",
'alt': "Euro",
'title': {
"de": "Pay-per-Use-Angebot, Information",
"en": "per-per-use, information"
},
},
'email': {
'url': "/nav1/grafik/all/email_transp.gif",
'alt': { "de": "Brief", "en": "letter" },
'title': { "de": "E-Mail", "en": "mail" },
'align': "align='bottom'",
},
'oldbook': {
'url': "/nav1/grafik/all/oldbook.gif",
'alt': { "de": "Handschrift", "en": "manuscript" },
'title': { "de": "Handschrift", "en": "manuscript" },
'align': "align='middle'",
},
}

View File

@ -0,0 +1,48 @@
from markupsafe import Markup
#from jinja2 import Markup
def marginews(self, feed, heading, link, readMore, content_file, addLinks=None):
template = f"""
<div class="headlineRightColumn">
<a href="<% {feed} %>">
<img src="/bilder/rss.png" align="right" alt="RSS" />
</a>
<a href="{link}">{heading}</a>
</div>
<div class="boxRightColumn boxRightColumnStartseite">
"""
if isinstance(addLinks, list):
addLinks_str = '<div class="weiternews">'
for text, url in addLinks:
addLinks_str += f'<a href="{url}">{text}</a> &bull;'
addLinks_str += '</div>'
elif addLinks:
addLinks_str = ( '<!-- Error: addLinks is a ' + type(addLinks).__name__
+ ', needs to be list! -->' )
else:
addLinks_str = '<!-- No added links -->'
if readMore:
readMore = f"""
<div class="weiternews">
<a href="http://ub.blog.uni-heidelberg.de">
{self.i18n(de="Weiteres im UBlog", en="Earlier UBlog posts")}
&hellip;</a>
</div>
"""
else:
readMore = ''
readMore += '</div>'
try:
content_file = "/web/news/" + content_file + '.news'
with open(content_file, 'r') as content:
out_string = content.read()
except OSError as e:
return str(e)
else:
return Markup(template + out_string + addLinks_str + readMore)

View File

@ -0,0 +1,12 @@
import re
SNIPPETS = {}
with open('/web/etc/www-data/snippet.list') as snl:
for line in snl:
label, value = re.split(r'\s+', line, 1)
value = value.rstrip("\r\n")
SNIPPETS[label] = value
def snippet(self, name):
return SNIPPETS[name]

View File

@ -0,0 +1,7 @@
from markupsafe import Markup
#from jinja2 import Markup
def up(self):
return Markup("""
<a href="#up"><img style="display:block;float:right;width:39px;height:13px" alt="zum Seitenanfang" title="zum Seitenanfang" src="/nav4/grafik/layout/icon_top.gif"></a>
""")

View File

@ -0,0 +1 @@
echo "delete from timestamps where timestamp < datetime('now', '-1 DAY');" | sqlite3 /var/tmp/www/forms-sent-timestamps.db

11
flask/uwsgi.ini Normal file
View File

@ -0,0 +1,11 @@
[uwsgi]
socket = /opt/floritiv-web-site/floritiweb.sock
chmod-socket = 660
module = wsgi:app
processes = 5
uid = www-data
touch-reload = /opt/floritiv-web-site/.git/index
die-on-term=true
logto = /var/log/nginx/uwsgi/uwsgi-app.log
log-maxsize=100000000

6
flask/wsgi.py Normal file
View File

@ -0,0 +1,6 @@
from floritiweb import create_app
app = create_app()
if __name__ == '__main__':
app.run

20
requirements.txt Normal file
View File

@ -0,0 +1,20 @@
blinker==1.6.3
certifi==2024.2.2
charset-normalizer==3.3.2
click==8.1.7
defusedxml==0.7.1
deprecation==2.1.0
flask==3.0.0
idna==3.6
importlib-metadata==6.8.0
itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.3
packaging==24.0
python-dotenv==1.0.0
pyotrs==1.4.0
requests==2.31.0
urllib3==2.2.1
uWSGI==2.0.22
werkzeug==3.0.0
zipp==3.17.0