Compare commits

1 Commits

Author SHA1 Message Date
9eb63b111d Dateien nach "/" hochladen 2026-04-01 08:00:29 +02:00
3 changed files with 256 additions and 331 deletions

2
.gitignore vendored
View File

@@ -1,2 +0,0 @@
# Testskripte werden nicht versioniert (enthalten ggf. sensible Produktionsskripte)
testskripte/

View File

@@ -1,74 +0,0 @@
# RepoVizChecker
Ein Python-Werkzeug zum Abgleich von **RepoViz-Annotationen** mit dem tatsächlich verwendeten SQL-Code in Kornshell-Skripten (`.ksh`).
---
## Beschreibung
KSH-Skripte enthalten häufig RepoViz-Metadaten in Form von Kommentar-Annotationen:
```ksh
#@modul: mein_skript.ksh
#@quelle: TABELLE_A, TABELLE_B
#@ziel: TABELLE_Z
```
`repovizcheck.py` liest diese Annotationen aus und vergleicht sie mit den Tabellennamen, die im Code über `$SCHEMA.<TABELLE>` bzw. `${SCHEMA}.<TABELLE>` tatsächlich referenziert werden.
---
## Features
- ✅ Liest `#@modul:`, `#@quelle:` und `#@ziel:` Annotationen aus dem KSH-Skript
- ✅ Erkennt verwendete Tabellen per Regex (`$SCHEMA.*` / `${SCHEMA}.*`)
- ✅ Vergleich **Repo → SQL**: Sind alle dokumentierten Tabellen im Code vorhanden?
- ✅ Vergleich **SQL → Repo**: Sind alle Code-Tabellen in den Annotationen dokumentiert?
- ✅ Prüft, ob der Skriptname in `#@modul:` eingetragen ist
- ✅ Encoding-Fallback: UTF-8 → ISO-8859-1
- ✅ Temporäre Tabellen (`TMP_*`) werden automatisch ignoriert
---
## Voraussetzungen
- Python 3.x
- Keine zusätzlichen Pakete erforderlich (nur Standardbibliothek)
---
## Aufruf
```powershell
python repovizcheck.py <skript.ksh>
```
### Beispiel
```powershell
python repovizcheck.py C:\projekte\etl_job.ksh
```
---
## Ausgabe (Übersicht)
| Abschnitt | Beschreibung |
|---|---|
| **RepoViz-Informationen** | Sortierte Ausgabe von Modul, Quell- und Zieltabellen |
| **Modul-Check** | Ist der Skriptname in `#@modul:` eingetragen? |
| **Repo → SQL** | Welche dokumentierten Tabellen sind im Code vorhanden / fehlen? |
| **SQL → Repo** | Welche Code-Tabellen sind dokumentiert / nicht dokumentiert? |
---
## Hinweise
- Es werden ausschließlich `.ksh`-Dateien akzeptiert (case-insensitive).
- Tabellennamen werden für alle Vergleiche in **Großbuchstaben** normalisiert.
---
## Lizenz
Internes Werkzeug Deutsche Telekom AG

View File

@@ -1,255 +1,256 @@
#!/usr/bin/env python """
# -*- coding: utf-8 -*- repovizcheck.py
"""
repovizcheck.py Kurzbeschreibung:
Dieses Skript vergleicht RepoViz-Metadaten (Annotationen in Kornshell-Skripten: "#@modul:", "#@quelle:", "#@ziel:")
Kurzbeschreibung: mit tatsächlich im Skript verwendeten Tabellen (Erkennung über $SCHEMA.<TABELLE> oder ${SCHEMA}<TABELLE>). Es
Dieses Skript vergleicht RepoViz-Metadaten (Annotationen in Kornshell-Skripten: "#@modul:", "#@quelle:", "#@ziel:") ermöglicht einen schnellen Abgleich, welche Tabellen in RepoViz dokumentiert sind und welche im Code auftauchen.
mit tatsächlich im Skript verwendeten Tabellen (Erkennung über $SCHEMA.<TABELLE> oder ${SCHEMA}<TABELLE>). Es
ermöglicht einen schnellen Abgleich, welche Tabellen in RepoViz dokumentiert sind und welche im Code auftauchen. Aufruf:
python repovizcheck.py <skript.ksh>
Aufruf:
python repovizcheck.py <skript.ksh> Wichtige Hinweise:
- Akzeptierte Dateiendung: .ksh (case-insensitive).
Wichtige Hinweise: - Kodierung: versucht zunächst UTF-8, bei Fehlern Fallback auf ISO-8859-1.
- Akzeptierte Dateiendung: .ksh (case-insensitive). - Tabellennamen werden für Vergleiche normalisiert (Uppercase); Regex-Parsing wurde robustifiziert.
- Kodierung: versucht zunächst UTF-8, bei Fehlern Fallback auf ISO-8859-1. """
- Tabellennamen werden für Vergleiche normalisiert (Uppercase); Regex-Parsing wurde robustifiziert.
""" from __future__ import print_function
import sys
from __future__ import print_function import re
import sys import os.path as p
import re import argparse
import os.path as p
import argparse print(r" ____ __ ___ ____ _ _ ")
import io print(r"| _ \ ___ _ __ ___ \ \ / (_)____ / ___| |__ ___ ___| | _____ _ __ ")
print(r"| |_) / _ \ '_ \ / _ \ \ \ / /| |_ / | | | '_ \ / _ \/ __| |/ / _ \ '__|")
print(r" ____ __ ___ ____ _ _ ") print(r"| _ < __/ |_) | (_) | \ V / | |/ / | |___| | | | __/ (__| < __/ | ")
print(r"| _ \ ___ _ __ ___ \ \ / (_)____ / ___| |__ ___ ___| | _____ _ __ ") print(r"|_| \_\___| .__/ \___/ \_/ |_/___| \____|_| |_|\___|\___|_|\_\___|_| ")
print(r"| |_) / _ \ '_ \ / _ \ \ \ / /| |_ / | | | '_ \ / _ \/ __| |/ / _ \ '__|") print(r" |_| ")
print(r"| _ < __/ |_) | (_) | \ V / | |/ / | |___| | | | __/ (__| < __/ | ")
print(r"|_| \_\___| .__/ \___/ \_/ |_/___| \____|_| |_|\___|\___|_|\_\___|_| ")
print(r" |_| ") such_quelle = "#@quelle:" #
such_ziel = "#@ziel:" #
such_modul = "#@modul:" #
such_quelle = "#@quelle:" #
such_ziel = "#@ziel:" # allezeilen = [] # Das Skript wird zeilenwiese in diese Liste geschrieben, eine Zeile gleich ein Element der Liste
such_modul = "#@modul:" # kommentarzeilen = [] # Ein Teil der nicht benoetigten Zeilen kommt in diese Liste
codezeilen = [] # Zeilen mit Code kommen in diese Liste
allezeilen = [] # Das Skript wird zeilenwiese in diese Liste geschrieben, eine Zeile gleich ein Element der Liste # Liste der gueltigen Dateiendungen (case-insensitive)
kommentarzeilen = [] # Ein Teil der nicht benoetigten Zeilen kommt in diese Liste perm_file_suffix = ['.ksh'] # Liste der gueltigen Dateiendungen
codezeilen = [] # Zeilen mit Code kommen in diese Liste
# Liste der gueltigen Dateiendungen (case-insensitive) # Verwende argparse für robusten CLI-Aufruf und Help
perm_file_suffix = ['.ksh'] # Liste der gueltigen Dateiendungen parser = argparse.ArgumentParser(description='Check RepoViz annotations against SQL in a ksh script')
parser.add_argument('sourcefile', help='Kornshell script to check (must end with .ksh)')
# Verwende argparse für robusten CLI-Aufruf und Help args = parser.parse_args()
parser = argparse.ArgumentParser(description='Check RepoViz annotations against SQL in a ksh script') sourcefile = args.sourcefile
parser.add_argument('sourcefile', help='Kornshell script to check (must end with .ksh)')
args = parser.parse_args() # Dateiendung prüfen (case-insensitive)
sourcefile = args.sourcefile file_suffix = p.splitext(sourcefile)[1].lower()
if file_suffix not in perm_file_suffix:
# Dateiendung prüfen (case-insensitive) print() # Gib eine leere Zeile aus
file_suffix = p.splitext(sourcefile)[1].lower() print("Only Kornshell scripts can be processed.") # Gib diese Meldung aus
if file_suffix not in perm_file_suffix: print()
print() # Gib eine leere Zeile aus sys.exit(1)
print("Only Kornshell scripts can be processed.") # Gib diese Meldung aus
print()
sys.exit(1) # Funktion zum Oeffnen einer Datei mit Pruefung
def get(name):
'''Funktion zum Oeffnen einer Datei mit Pruefung (prüft Encoding)'''
# Funktion zum Oeffnen einer Datei mit Pruefung # Versuche zuerst UTF-8, lese die gesamte Datei zur Validierung.
def get(name): try:
'''Funktion zum Oeffnen einer Datei mit Pruefung (prüft Encoding)''' f = open(name, "r", encoding='utf-8')
# io.open() wird verwendet, da es in Python 2 und 3 den encoding-Parameter unterstuetzt. f.read() # komplette Datei lesen, um Decode-Fehler sicher zu erkennen
# Versuche zuerst UTF-8, lese die gesamte Datei zur Validierung. f.seek(0)
try: return f
f = io.open(name, "r", encoding='utf-8') except UnicodeDecodeError:
f.read() # komplette Datei lesen, um Decode-Fehler sicher zu erkennen # UTF-8 passt nicht — versuche ISO-8859-1
f.seek(0) try:
return f f.close()
except UnicodeDecodeError: return open(name, "r", encoding='ISO-8859-1')
# UTF-8 passt nicht — versuche ISO-8859-1 except IOError:
try: print("")
f.close() print("The File", name, "can't be opened with fallback encoding!")
return io.open(name, "r", encoding='ISO-8859-1') print("")
except IOError: sys.exit(1)
print("") except IOError:
print("The File", name, "can't be opened with fallback encoding!") print("")
print("") print("The File", name, "doesn't exist or can't be opened!")
sys.exit(1) print("")
except IOError: sys.exit(1)
print("") except Exception:
print("The File", name, "doesn't exist or can't be opened!") # unerwarteter Fehler beim Lesen
print("") try:
sys.exit(1) f.close()
except Exception: except Exception:
# unerwarteter Fehler beim Lesen pass
try: print("")
f.close() print("Error reading the file", name)
except Exception: print("")
pass sys.exit(1)
print("") return None
print("Error reading the file", name)
print("")
sys.exit(1) # Funktion zum Erstellen einer Tabellenliste aus den RepoViz Informationen
return None def erstelle_liste(datei, typ): # 2 Parameter
'''Erstelle eine Liste aus der übergebenen Datei für den Typ der übergeben wurde'''
laenge = len(typ)
# Funktion zum Erstellen einer Tabellenliste aus den RepoViz Informationen tabellenliste = []
def erstelle_liste(datei, typ): # 2 Parameter fobj_in = get(datei)
'''Erstelle eine Liste aus der übergebenen Datei für den Typ der übergeben wurde''' for line in fobj_in:
laenge = len(typ) # entferne nur führende Leerzeichen vor der Prüfung
tabellenliste = [] content = line.lstrip()
fobj_in = get(datei) if content.startswith(typ):
for line in fobj_in: zeile = content[len(typ):].strip()
# entferne nur führende Leerzeichen vor der Prüfung # entferne ein mögliches abschließendes Komma
content = line.lstrip() if zeile.endswith(','):
if content.startswith(typ): zeile = zeile[:-1]
zeile = content[len(typ):].strip() liste = [it.strip() for it in zeile.split(',') if it.strip()]
# entferne ein mögliches abschließendes Komma for item in liste:
if zeile.endswith(','): # normalisiere auf Großbuchstaben für spätere Vergleiche
zeile = zeile[:-1] tabellenliste.append(item.strip().upper())
liste = [it.strip() for it in zeile.split(',') if it.strip()] fobj_in.close()
for item in liste: return tabellenliste
# normalisiere auf Großbuchstaben für spätere Vergleiche
tabellenliste.append(item.strip().upper())
fobj_in.close() def trennzeile(typ): # Funktion zum Ausgeben einer 80 Zeichen breiten Trennzeile.
return tabellenliste '''
Erstellt eine 80 zeichenbreite Zeile mit dem übergebenen Zeichen
'''
def trennzeile(typ): # Funktion zum Ausgeben einer Trennzeile. print(typ * 80) # Das Trennzeichen ist variabel und wird der Funktion als Parameter uebergeben.
'''
Erstellt eine 87 zeichenbreite Zeile mit dem übergebenen Zeichen
''' modulliste = erstelle_liste(sourcefile, such_modul) # Erstelle Liste mit den Modulen
print(typ * 87) # Das Trennzeichen ist variabel und wird der Funktion als Parameter uebergeben. quelleliste = erstelle_liste(sourcefile, such_quelle) # Erstelle Liste mit den Quellen
zielliste = erstelle_liste(sourcefile, such_ziel) # Erstelle Liste mit den Zielen
modulliste = erstelle_liste(sourcefile, such_modul) # Erstelle Liste mit den Modulen datei = get(sourcefile) # Oeffnen der Datei mit einer Funktion, die auch prueft ob die Datei existiert
quelleliste = erstelle_liste(sourcefile, such_quelle) # Erstelle Liste mit den Quellen allezeilen = datei.readlines() # Lesen aller Zeilen in eine Liste
zielliste = erstelle_liste(sourcefile, such_ziel) # Erstelle Liste mit den Zielen datei.close()
datei = get(sourcefile) # Oeffnen der Datei mit einer Funktion, die auch prueft ob die Datei existiert # Versuch die Kommentarzeilen loszuwerden
allezeilen = datei.readlines() # Lesen aller Zeilen in eine Liste for zeile in allezeilen:
datei.close() s = zeile.strip()
if not s:
# Versuch die Kommentarzeilen loszuwerden # leere Zeilen überspringen
for zeile in allezeilen: continue
s = zeile.strip() if s.startswith('#@modul:'):
if not s: codezeilen.append(s)
# leere Zeilen überspringen continue
continue # Gruppiere typische Kommentar-/Meta-Zeilen (case-insensitive)
if s.startswith('#@modul:'): up = s.upper()
codezeilen.append(s) if s.startswith('#') or s.startswith('--') or up.startswith('J000') or up.startswith('SQLFILE') \
continue or up.startswith('FMELDUNG') or up.startswith('ALTER') or up.startswith('F_TRUNCATE') \
# Gruppiere typische Kommentar-/Meta-Zeilen (case-insensitive) or up == 'FI' or up == 'IF' or up.startswith('F_RUNSTATS'):
up = s.upper() kommentarzeilen.append(s)
if s.startswith('#') or s.startswith('--') or up.startswith('J000') or up.startswith('SQLFILE') \ else:
or up.startswith('FMELDUNG') or up.startswith('ALTER') or up.startswith('F_TRUNCATE') \ codezeilen.append(s)
or up == 'FI' or up == 'IF' or up.startswith('F_RUNSTATS'):
kommentarzeilen.append(s)
else: trennzeile("+")
codezeilen.append(s) print("Sorted output of RepoViz information.")
print("")
ausgabe = "{:10}{:}" # Definition AusgabeFormat 1.Feld begrenzt auf 10 Zeichen, das 2.Feld unbegrenzt
trennzeile("+") quelleliste.sort() # Sortiere Liste <quelleliste>
print("Sorted output of RepoViz information.") zielliste.sort() # Sortiere Liste <zielliste>
print("") print(ausgabe.format(such_modul, modulliste)) # Ausgabe <modulliste>
ausgabe = "{:10}{:}" # Definition AusgabeFormat 1.Feld begrenzt auf 10 Zeichen, das 2.Feld unbegrenzt print(ausgabe.format(such_quelle, quelleliste)) # Ausgabe <quelleliste>
quelleliste.sort() # Sortiere Liste <quelleliste> print(ausgabe.format(such_ziel, zielliste)) # Ausgabe <zielliste>
zielliste.sort() # Sortiere Liste <zielliste>
print(ausgabe.format(such_modul, modulliste)) # Ausgabe <modulliste> # Vergleiche sourcefile mit modulliste hier einfuegen
print(ausgabe.format(such_quelle, quelleliste)) # Ausgabe <quelleliste> # Eventuell anpassen, es steht nur der Skriptname in den RepoVizInfos
print(ausgabe.format(such_ziel, zielliste)) # Ausgabe <zielliste> trennzeile("~")
trennzeile("")
# Vergleiche sourcefile mit modulliste hier einfuegen print("Comparison of given script name with RepoVizInformation")
# Eventuell anpassen, es steht nur der Skriptname in den RepoVizInfos # Vergleiche nur einmal, normalisiere auf Großbuchstaben
trennzeile("~") script_basename = p.basename(sourcefile).upper()
trennzeile("") if script_basename in [m.strip().upper() for m in modulliste]:
print("Comparison of given script name with RepoVizInformation") print("Script", p.basename(sourcefile), "is in the RepoViz information!")
# Vergleiche nur einmal, normalisiere auf Großbuchstaben else:
script_basename = p.basename(sourcefile).upper() print("Script", p.basename(sourcefile), "is missing in the RepoViz information!")
if script_basename in [m.strip().upper() for m in modulliste]:
print("Script", p.basename(sourcefile), "is in the RepoViz information!") # Suche Objekte anhand der RepoVizInformationen
else: trennzeile("~")
print("Script", p.basename(sourcefile), "is missing in the RepoViz information!")
print("")
# Suche Objekte anhand der RepoVizInformationen print("Create a list of tables from the existing code: list <allezeilen>")
trennzeile("~") # Der Code steht schon in der Liste <codezeilen>
neue_liste = []
print("") # verbessertes Regex-Parsing: capture-Gruppe für Tabellennamen, compile einmal
print("Create a list of tables from the existing code: list <allezeilen>") regex = re.compile(r"(?:\$SCHEMA\.|\${SCHEMA}\.)\s*([A-Z0-9_]+)", re.IGNORECASE)
# Der Code steht schon in der Liste <codezeilen> # nutze Set für eindeutige Sammlung
neue_liste = [] neue_set = set()
# verbessertes Regex-Parsing: capture-Gruppe für Tabellennamen, compile einmal for line in allezeilen:
regex = re.compile(r"(?:\$SCHEMA\.|\${SCHEMA}\.)\s*([A-Z0-9_]+)", re.IGNORECASE) for match in regex.findall(line):
# nutze Set für eindeutige Sammlung tabname = match.strip()
neue_set = set() if not tabname:
for line in allezeilen: continue
for match in regex.findall(line): # ignoriere Variablen oder temporäre Tabellen
tabname = match.strip() if tabname.startswith('$'):
if not tabname: continue
continue tabname_norm = re.sub(r"[^A-Z0-9_]", "", tabname.upper())
# ignoriere Variablen oder temporäre Tabellen if tabname_norm and not tabname_norm.startswith('TMP_'):
if tabname.startswith('$'): neue_set.add(tabname_norm)
continue neue_liste = sorted(neue_set)
tabname_norm = re.sub(r"[^A-Z0-9_]", "", tabname.upper())
if tabname_norm and not tabname_norm.startswith('TMP_'): print("The list contains:", len(neue_liste), "entries")
neue_set.add(tabname_norm) trennzeile("~")
neue_liste = sorted(neue_set)
print("The list contains:", len(neue_liste), "entries") # Vergleich repo -> SQL
trennzeile("~") # 2 Teile
# Quelle - quelleliste
# nehme Liste "quelleliste" und suche damit in Liste "neue_liste"
# Vergleich repo -> SQL # trennzeile("#")
# 2 Teile print("Are the tables of the list", such_quelle, "included in the SQL?")
# Quelle - quelleliste trennzeile("~")
# nehme Liste "quelleliste" und suche damit in Liste "neue_liste" for item in quelleliste:
# trennzeile("#") # quelleliste wurde in erstelle_liste bereits normalisiert (upper)
ausgabe = "{:12}{:55}{:20}" # Ausgabeformat: Label(12), Tabellenname(55), Status(20) if item.upper() in neue_liste:
print("Are the tables of the list", such_quelle, "included in the SQL?") ausgabe = "{:12}{:40}{:20}"
trennzeile("~") print(ausgabe.format("The Table", item, "is available"))
for item in quelleliste: else:
# quelleliste wurde in erstelle_liste bereits normalisiert (upper) ausgabe = "{:12}{:40}{:20}"
if item.upper() in neue_liste: print(ausgabe.format("The Table", item, "is not available"))
print(ausgabe.format("The Table", item, "is available"))
else: # Quelle - zielliste
print(ausgabe.format("The Table", item, "is not available")) # nehme Liste "zielliste" und suche damit in Liste "neue_liste"
trennzeile("~")
# Quelle - zielliste print("Are the tables of the list", such_ziel, "included in the SQL?")
# nehme Liste "zielliste" und suche damit in Liste "neue_liste" trennzeile("~")
trennzeile("~") for item in zielliste:
print("Are the tables of the list", such_ziel, "included in the SQL?") if item.upper() in neue_liste:
trennzeile("~") ausgabe = "{:12}{:40}{:20}"
for item in zielliste: print(ausgabe.format("The Table", item, "is available"))
if item.upper() in neue_liste: else:
print(ausgabe.format("The Table", item, "is available")) ausgabe = "{:12}{:40}{:20}"
else: print(ausgabe.format("The Table", item, "is not available"))
print(ausgabe.format("The Table", item, "is not available"))
trennzeile("~")
trennzeile("~") print("Create unique sorted list from ", such_quelle, "and", such_ziel)
print("Create unique sorted list from ", such_quelle, "and", such_ziel) q_z_liste = sorted({t.upper() for t in (quelleliste + zielliste)})
q_z_liste = sorted({t.upper() for t in (quelleliste + zielliste)})
trennzeile("~")
trennzeile("~")
# Vergleich SQL -> repo
# Vergleich SQL -> repo # Quelle - neue_liste
# Quelle - neue_liste # nehme Liste "quelleliste" und suche damit in Liste "neue_liste"
# nehme Liste "quelleliste" und suche damit in Liste "neue_liste" trennzeile("~")
trennzeile("~") print("Are the tables from the SQL included in the RepoViz information?")
print("Are the tables from the SQL included in the RepoViz information?") print("Note: I've merged <#@quelle> and <#@ziel> into one list ")
print("Note: I've merged <#@quelle> and <#@ziel> into one list ") trennzeile("~")
trennzeile("~") for item in neue_liste:
for item in neue_liste: if item in q_z_liste:
if item in q_z_liste: ausgabe = "{:12}{:40}{:20}"
print(ausgabe.format("The Table", item, "is available")) print(ausgabe.format("The Table", item, "is available"))
else: else:
print(ausgabe.format("The Table", item, "is not available")) ausgabe = "{:12}{:40}{:20}"
print(ausgabe.format("The Table", item, "is not available"))
trennzeile("+")
print(r" _____ _ _____ _ ") trennzeile("+")
print(r"|_ _| |__ ___ | ____|_ __ __| |") print(r" _____ _ _____ _ ")
print(r" | | | '_ \ / _ \ | _| | '_ \ / _` |") print(r"|_ _| |__ ___ | ____|_ __ __| |")
print(r" | | | | | | __/ | |___| | | | (_| |") print(r" | | | '_ \ / _ \ | _| | '_ \ / _` |")
print(r" |_| |_| |_|\___| |_____|_| |_|\__,_|") print(r" | | | | | | __/ | |___| | | | (_| |")
print("") print(r" |_| |_| |_|\___| |_____|_| |_|\__,_|")
print("")