feat/app-factory #4
@@ -64,3 +64,8 @@ Nach dem Absenden eines Anmeldeformulars wird eine vCard 4.0 im Ordner `vcards/`
|
||||
```
|
||||
Nachname_Vorname_<id>.vcf
|
||||
```
|
||||
|
||||
Hinweis zur Chat-Zusammenfassung
|
||||
--------------------------------
|
||||
|
||||
Die Datei `docs/CHAT_SUMMARY.md` enthält eine lokale Zusammenfassung unserer Chat-Sitzungen und wird bewusst nicht versioniert (sie ist in `.gitignore` eingetragen). Du findest die Datei lokal unter `docs/CHAT_SUMMARY.md`.
|
||||
|
||||
153
app.py
153
app.py
@@ -1,149 +1,26 @@
|
||||
from flask import Flask, render_template, request, redirect, url_for
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
"""Application entry-point wrapper.
|
||||
|
||||
This module keeps the previous API (importing `app`, `db`, `Frage` from
|
||||
the top-level `app` module) while delegating the real implementation to
|
||||
the `application` package's factory.
|
||||
"""
|
||||
|
||||
from application import create_app
|
||||
from application.extensions import db
|
||||
from application.models import Adresse, Frage, Antwort
|
||||
import os
|
||||
import re
|
||||
from utils import generate_vcard
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from flask_migrate import Migrate
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
DB_PATH = os.path.join(BASE_DIR, 'anmeldung.db')
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'
|
||||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||||
|
||||
db = SQLAlchemy(app)
|
||||
# Flask-Migrate (Alembic) integration
|
||||
migrate = Migrate(app, db)
|
||||
|
||||
# --- logging setup -------------------------------------------------
|
||||
LOG_DIR = os.path.join(BASE_DIR, 'logs')
|
||||
os.makedirs(LOG_DIR, exist_ok=True)
|
||||
log_file = os.path.join(LOG_DIR, 'app.log')
|
||||
handler = RotatingFileHandler(log_file, maxBytes=5 * 1024 * 1024, backupCount=3)
|
||||
formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s: %(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
handler.setLevel(logging.INFO)
|
||||
app.logger.addHandler(handler)
|
||||
app.logger.setLevel(logging.INFO)
|
||||
# --------------------------------------------------------------------
|
||||
|
||||
|
||||
class Adresse(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
vorname = db.Column(db.String(100), nullable=False)
|
||||
nachname = db.Column(db.String(100), nullable=False)
|
||||
strasse = db.Column(db.String(200), nullable=False)
|
||||
hausnummer = db.Column(db.String(50), nullable=True)
|
||||
plz = db.Column(db.String(20), nullable=False)
|
||||
ort = db.Column(db.String(100), nullable=False)
|
||||
land = db.Column(db.String(50), default='Deutschland')
|
||||
telefon_vorwahl = db.Column(db.String(20))
|
||||
telefon_nummer = db.Column(db.String(50))
|
||||
email = db.Column(db.String(200))
|
||||
|
||||
|
||||
class Frage(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
text = db.Column(db.String(500), nullable=False)
|
||||
|
||||
|
||||
class Antwort(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
adresse_id = db.Column(db.Integer, db.ForeignKey('adresse.id'), nullable=False)
|
||||
frage_id = db.Column(db.Integer, db.ForeignKey('frage.id'), nullable=False)
|
||||
text = db.Column(db.String(1000), nullable=True)
|
||||
|
||||
|
||||
@app.route('/', methods=['GET', 'POST'])
|
||||
def index():
|
||||
if request.method == 'POST':
|
||||
# Adresse speichern
|
||||
vorname = request.form.get('vorname', '').strip()
|
||||
nachname = request.form.get('nachname', '').strip()
|
||||
strasse = request.form.get('strasse', '').strip()
|
||||
hausnummer = request.form.get('hausnummer', '').strip()
|
||||
plz = request.form.get('plz', '').strip()
|
||||
ort = request.form.get('ort', '').strip()
|
||||
land = request.form.get('land', 'Deutschland').strip()
|
||||
telefon_vorwahl = request.form.get('telefon_vorwahl', '').strip()
|
||||
telefon_nummer = request.form.get('telefon_nummer', '').strip()
|
||||
email = request.form.get('email', '').strip()
|
||||
# server-side E-Mail Validierung (einfache Prüfung)
|
||||
errors = {}
|
||||
email_re = re.compile(r"[^@]+@[^@]+\.[^@]+")
|
||||
if email:
|
||||
if not email_re.match(email):
|
||||
errors['email'] = 'Ungültige E-Mail-Adresse'
|
||||
# PLZ Validierung: genau 5 Ziffern
|
||||
if plz:
|
||||
if not re.fullmatch(r"\d{5}", plz):
|
||||
errors['plz'] = 'Postleitzahl muss genau 5 Ziffern haben'
|
||||
|
||||
if errors:
|
||||
fragen = Frage.query.all()
|
||||
# pass form data back to template so fields are preserved
|
||||
form = request.form.to_dict()
|
||||
return render_template('index.html', fragen=fragen, errors=errors, form=form)
|
||||
|
||||
adresse = Adresse(
|
||||
vorname=vorname,
|
||||
nachname=nachname,
|
||||
strasse=strasse,
|
||||
hausnummer=hausnummer,
|
||||
plz=plz,
|
||||
ort=ort,
|
||||
land=land,
|
||||
telefon_vorwahl=telefon_vorwahl,
|
||||
telefon_nummer=telefon_nummer,
|
||||
email=email,
|
||||
)
|
||||
db.session.add(adresse)
|
||||
db.session.commit()
|
||||
|
||||
# Antworten speichern
|
||||
fragen = Frage.query.all()
|
||||
for frage in fragen:
|
||||
key = f'frage_{frage.id}'
|
||||
antwort_text = request.form.get(key, '').strip()
|
||||
antwort = Antwort(adresse_id=adresse.id, frage_id=frage.id, text=antwort_text)
|
||||
db.session.add(antwort)
|
||||
db.session.commit()
|
||||
|
||||
# vCard 4.0 erzeugen und speichern
|
||||
try:
|
||||
# determine base dir: prefer app.config, then app attribute, then module BASE_DIR
|
||||
base_dir = app.config.get('BASE_DIR') if app.config.get('BASE_DIR') else getattr(app, 'BASE_DIR', BASE_DIR)
|
||||
generate_vcard(adresse, base_dir)
|
||||
except Exception as e:
|
||||
# Log the exception with stack trace but don't abort the request
|
||||
app.logger.exception('Fehler beim Erzeugen der vCard for adresse id=%s: %s', adresse.id if hasattr(adresse, 'id') else 'unknown', e)
|
||||
|
||||
# Nach erfolgreichem Speichern weiterleiten
|
||||
return redirect(url_for('danke', id=adresse.id))
|
||||
|
||||
# GET: Formular anzeigen
|
||||
fragen = Frage.query.all()
|
||||
return render_template('index.html', fragen=fragen)
|
||||
|
||||
|
||||
@app.route('/danke')
|
||||
def danke():
|
||||
id = request.args.get('id')
|
||||
# Use session.get to avoid SQLAlchemy LegacyAPIWarning and cast id to int if present
|
||||
adresse = db.session.get(Adresse, int(id)) if id else None
|
||||
return render_template('danke.html', adresse=adresse)
|
||||
# Create the Flask app using the factory
|
||||
app = create_app()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# In development only: if there is no migrations directory, create tables automatically.
|
||||
# In all other cases prefer using Flask-Migrate (flask db upgrade).
|
||||
migrations_dir = os.path.join(BASE_DIR, 'migrations')
|
||||
migrations_dir = os.path.join(app.BASE_DIR, 'migrations')
|
||||
if not os.path.exists(migrations_dir):
|
||||
app.logger.info('No migrations directory found; creating database tables with db.create_all()')
|
||||
db.create_all()
|
||||
with app.app_context():
|
||||
db.create_all()
|
||||
else:
|
||||
app.logger.info('Migrations directory present; please use "flask db upgrade" to update the database schema')
|
||||
app.run(debug=True)
|
||||
|
||||
43
application/__init__.py
Normal file
43
application/__init__.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import os
|
||||
from flask import Flask
|
||||
from .extensions import db, migrate
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
|
||||
def create_app(config=None):
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
# point templates/static to repository-level folders for compatibility
|
||||
project_root = os.path.dirname(BASE_DIR)
|
||||
app = Flask(__name__, template_folder=os.path.join(project_root, 'templates'), static_folder=os.path.join(project_root, 'static'), instance_relative_config=True)
|
||||
# default config
|
||||
DB_PATH = os.path.join(os.path.dirname(BASE_DIR), 'anmeldung.db')
|
||||
app.config.setdefault('SQLALCHEMY_DATABASE_URI', f'sqlite:///{DB_PATH}')
|
||||
app.config.setdefault('SQLALCHEMY_TRACK_MODIFICATIONS', False)
|
||||
|
||||
if config:
|
||||
app.config.update(config)
|
||||
|
||||
# init extensions
|
||||
db.init_app(app)
|
||||
migrate.init_app(app, db)
|
||||
|
||||
# logging setup (mirror previous behavior)
|
||||
LOG_DIR = os.path.join(os.path.dirname(BASE_DIR), 'logs')
|
||||
os.makedirs(LOG_DIR, exist_ok=True)
|
||||
log_file = os.path.join(LOG_DIR, 'app.log')
|
||||
handler = RotatingFileHandler(log_file, maxBytes=5 * 1024 * 1024, backupCount=3)
|
||||
formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s: %(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
handler.setLevel(logging.INFO)
|
||||
app.logger.addHandler(handler)
|
||||
app.logger.setLevel(logging.INFO)
|
||||
|
||||
# register blueprints
|
||||
from .routes import bp as public_bp
|
||||
app.register_blueprint(public_bp)
|
||||
|
||||
# expose BASE_DIR on app for compatibility
|
||||
app.BASE_DIR = os.path.dirname(BASE_DIR)
|
||||
|
||||
return app
|
||||
5
application/extensions.py
Normal file
5
application/extensions.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_migrate import Migrate
|
||||
|
||||
db = SQLAlchemy()
|
||||
migrate = Migrate()
|
||||
27
application/models.py
Normal file
27
application/models.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from .extensions import db
|
||||
|
||||
|
||||
class Adresse(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
vorname = db.Column(db.String(100), nullable=False)
|
||||
nachname = db.Column(db.String(100), nullable=False)
|
||||
strasse = db.Column(db.String(200), nullable=False)
|
||||
hausnummer = db.Column(db.String(50), nullable=True)
|
||||
plz = db.Column(db.String(20), nullable=False)
|
||||
ort = db.Column(db.String(100), nullable=False)
|
||||
land = db.Column(db.String(50), default='Deutschland')
|
||||
telefon_vorwahl = db.Column(db.String(20))
|
||||
telefon_nummer = db.Column(db.String(50))
|
||||
email = db.Column(db.String(200))
|
||||
|
||||
|
||||
class Frage(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
text = db.Column(db.String(500), nullable=False)
|
||||
|
||||
|
||||
class Antwort(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
adresse_id = db.Column(db.Integer, db.ForeignKey('adresse.id'), nullable=False)
|
||||
frage_id = db.Column(db.Integer, db.ForeignKey('frage.id'), nullable=False)
|
||||
text = db.Column(db.String(1000), nullable=True)
|
||||
77
application/routes.py
Normal file
77
application/routes.py
Normal file
@@ -0,0 +1,77 @@
|
||||
from flask import Blueprint, render_template, request, redirect, url_for, current_app
|
||||
from .extensions import db
|
||||
from .models import Adresse, Frage, Antwort
|
||||
from utils import generate_vcard
|
||||
import re
|
||||
|
||||
bp = Blueprint('public', __name__)
|
||||
|
||||
|
||||
@bp.route('/', methods=['GET', 'POST'])
|
||||
def index():
|
||||
if request.method == 'POST':
|
||||
vorname = request.form.get('vorname', '').strip()
|
||||
nachname = request.form.get('nachname', '').strip()
|
||||
strasse = request.form.get('strasse', '').strip()
|
||||
hausnummer = request.form.get('hausnummer', '').strip()
|
||||
plz = request.form.get('plz', '').strip()
|
||||
ort = request.form.get('ort', '').strip()
|
||||
land = request.form.get('land', 'Deutschland').strip()
|
||||
telefon_vorwahl = request.form.get('telefon_vorwahl', '').strip()
|
||||
telefon_nummer = request.form.get('telefon_nummer', '').strip()
|
||||
email = request.form.get('email', '').strip()
|
||||
|
||||
errors = {}
|
||||
email_re = re.compile(r"[^@]+@[^@]+\.[^@]+")
|
||||
if email:
|
||||
if not email_re.match(email):
|
||||
errors['email'] = 'Ungültige E-Mail-Adresse'
|
||||
if plz:
|
||||
if not re.fullmatch(r"\d{5}", plz):
|
||||
errors['plz'] = 'Postleitzahl muss genau 5 Ziffern haben'
|
||||
|
||||
if errors:
|
||||
fragen = Frage.query.all()
|
||||
form = request.form.to_dict()
|
||||
return render_template('index.html', fragen=fragen, errors=errors, form=form)
|
||||
|
||||
adresse = Adresse(
|
||||
vorname=vorname,
|
||||
nachname=nachname,
|
||||
strasse=strasse,
|
||||
hausnummer=hausnummer,
|
||||
plz=plz,
|
||||
ort=ort,
|
||||
land=land,
|
||||
telefon_vorwahl=telefon_vorwahl,
|
||||
telefon_nummer=telefon_nummer,
|
||||
email=email,
|
||||
)
|
||||
db.session.add(adresse)
|
||||
db.session.commit()
|
||||
|
||||
fragen = Frage.query.all()
|
||||
for frage in fragen:
|
||||
key = f'frage_{frage.id}'
|
||||
antwort_text = request.form.get(key, '').strip()
|
||||
antwort = Antwort(adresse_id=adresse.id, frage_id=frage.id, text=antwort_text)
|
||||
db.session.add(antwort)
|
||||
db.session.commit()
|
||||
|
||||
try:
|
||||
base_dir = current_app.config.get('BASE_DIR') if current_app.config.get('BASE_DIR') else getattr(current_app, 'BASE_DIR', '.')
|
||||
generate_vcard(adresse, base_dir)
|
||||
except Exception as e:
|
||||
current_app.logger.exception('Fehler beim Erzeugen der vCard for adresse id=%s: %s', adresse.id if hasattr(adresse, 'id') else 'unknown', e)
|
||||
|
||||
return redirect(url_for('public.danke', id=adresse.id))
|
||||
|
||||
fragen = Frage.query.all()
|
||||
return render_template('index.html', fragen=fragen)
|
||||
|
||||
|
||||
@bp.route('/danke')
|
||||
def danke():
|
||||
id = request.args.get('id')
|
||||
adresse = db.session.get(Adresse, int(id)) if id else None
|
||||
return render_template('danke.html', adresse=adresse)
|
||||
Reference in New Issue
Block a user