From: Shane Jaroch Date: Tue, 30 Dec 2025 04:10:58 +0000 (-0500) Subject: Automated tests, session caching, place API, cleanup X-Git-Url: https://git.nutra.tk/v1?a=commitdiff_plain;h=894ed42da567ab6f4d714ab1950847c34ae0cda9;p=gamesguru%2Fgetmyancestors.git Automated tests, session caching, place API, cleanup Testing & CI: - d78b16e working coverage (28% only for now) - c9b9d7b add more unit tests - 527b685 add pytest and coverage targets - a6d07c4 beef up GitHub action - 3bf4ae8 fix Windows encoding error - 0ff93da fix macOS runner - 92d4f98 add .envrc for direnv Linting: - 62e1cd0 add ruff, black; format code in CI - 78f1f38 isort & black agree Features: - dc77f9f CACHE: add cache-control opt (conditional requests) - 21fdb59 working headless automation for evading bot-detection - 2b8f110 add geocoder to requirements Other contributors' work included: - @jadsongmatos: requests_cache HTTP caching - @bsudy: FS IDs, request caching, Geonames place API, Gramps XML export [alpha] - @josemando: requests-ratelimiter (rate limiting) Co-authored-by: jadsongmatos Co-authored-by: Barnabás Südy Co-authored-by: Josemando Sobral Signed-off-by: Shane Jaroch --- diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..c9d179e --- /dev/null +++ b/.envrc @@ -0,0 +1,5 @@ +source .venv/bin/activate +unset PS1 +source .env +export PYTHONPATH=. + diff --git a/.geminiignore b/.geminiignore new file mode 100644 index 0000000..e1aaf1f --- /dev/null +++ b/.geminiignore @@ -0,0 +1,3 @@ +!.gemini/ +!test_debug.py + diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..d1f862c --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,67 @@ +--- +name: ci + +"on": + push: {} + +permissions: + contents: read + +jobs: + test: + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} + + env: + SKIP_VENV: 1 + + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Fetch master (for incremental diff, lint filter mask) + run: git fetch origin master + + - name: Reload Cache / pip + uses: actions/setup-python@v5 + with: + python-version: 3 + cache: "pip" # caching pip dependencies + cache-dependency-path: "**/*requirements*.txt" + # update-environment: false + + - name: Install requirements + run: | + pip install -r requirements.txt + pip install -r .requirements-lint.txt + + # NOTE: pytest is needed to lint the folder: "tests/" + # pip install -r requirements-test.txt + + - name: format + run: make format + + - name: Lint + run: make lint + + - name: Verify no new formatting changes applied + run: | + git update-index -q --refresh + git diff # show the diff + git diff-index --quiet HEAD -- # exit non-zero on any diff + + - name: Test [Unit] + env: + FAMILYSEARCH_USER: ${{ secrets.FAMILYSEARCH_USER }} + FAMILYSEARCH_PASS: ${{ secrets.FAMILYSEARCH_PASS }} + run: make test/unit + + - name: Test [E2E] + env: + FAMILYSEARCH_USER: ${{ secrets.FAMILYSEARCH_USER }} + FAMILYSEARCH_PASS: ${{ secrets.FAMILYSEARCH_PASS }} + run: make test/e2e diff --git a/.gitignore b/.gitignore index 0679b96..32e5785 100644 --- a/.gitignore +++ b/.gitignore @@ -133,15 +133,18 @@ dmypy.json # Redis dump.rdb -# Dotfiles -.* -!.gitignore -!.readthedocs.yml - # vscode .vscode/ # getmyancestors stuff *.log +*.txt *.settings -*.ged \ No newline at end of file +*.ged +*.db +*.sqlite +*.sqlite3 + +!.geminiignore +/test_debug.py + diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..4627af6 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,19 @@ +[MASTER] + +fail-under=9.5 + + +[MESSAGES CONTROL] + +disable= + fixme, + consider-using-f-string, + missing-module-docstring, + missing-function-docstring, + duplicate-code, + too-few-public-methods, + too-many-arguments, + too-many-positional-arguments, + too-many-instance-attributes, + too-many-branches, + too-many-statements, diff --git a/.requirements-lint.txt b/.requirements-lint.txt new file mode 100644 index 0000000..df0f3b6 --- /dev/null +++ b/.requirements-lint.txt @@ -0,0 +1,10 @@ +black==25.12.0 +coverage==7.13.1 +flake8==7.3.0 +isort==7.0.0 +mypy==1.19.1 +pylint==4.0.4 +pytest==9.0.2 +ruff==0.14.10 +types-requests==2.32.4.20250913 + diff --git a/.tmp/.gitignore b/.tmp/.gitignore new file mode 100644 index 0000000..1287e9b --- /dev/null +++ b/.tmp/.gitignore @@ -0,0 +1,2 @@ +** +!.gitignore diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f894df4 --- /dev/null +++ b/Makefile @@ -0,0 +1,74 @@ +# .ONESHELL: +SHELL:=/bin/bash +.DEFAULT_GOAL=_help + +.PHONY: _help +_help: + @printf "\nUsage: make , valid commands:\n\n" + @grep -h "##H@@" $(MAKEFILE_LIST) | grep -v IGNORE_ME | sed -e 's/##H@@//' | column -t -s $$'\t' + +# help: ## Show this help +# @grep -Eh '\s##\s' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' + + +-include .env + +.PHONY: test/e2e +test/e2e: ##H@@ E2E/Smoke test for Bertrand Russell (LZDB-KV4) + which python + coverage run -p -m getmyancestors --verbose \ + -u "${FAMILYSEARCH_USER}" `# password goes in .env file` \ + --no-cache-control \ + -i LZDB-KV4 -a 0 \ + --outfile .tmp/russell_smoke_test.ged + echo "✓ Script completed successfully" + echo "File size: $(wc -c < .tmp/russell_smoke_test.ged) bytes" + echo "Line count: $(wc -l < .tmp/russell_smoke_test.ged) lines" + echo "--- First 20 lines of output ---" + head -n 20 .tmp/russell_smoke_test.ged + echo "--- Last 5 lines of output ---" + tail -n 5 .tmp/russell_smoke_test.ged + + +.PHONY: test/unit +test/unit: ##H@@ Run unit tests + coverage run -p -m pytest getmyancestors/tests + +.PHONY: test/ +test/: ##H@@ Run unit & E2E tests +test/: test/unit test/e2e + +.PHONY: coverage +coverage: ##H@@ Combine all coverage data and show report + -coverage combine + coverage report + + +REMOTE_HEAD ?= origin/master +PY_CHANGED_FILES ?= $(shell git diff --name-only --diff-filter=MACU $(REMOTE_HEAD) '*.py') + +.PHONY: format +format: ##H@@ Format with black & isort + isort ${PY_CHANGED_FILES} + black ${PY_CHANGED_FILES} + ruff check --fix --exit-zero ${PY_CHANGED_FILES} + +.PHONY: lint +lint: ##H@@ Lint with flake8 + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # x-fail as of Dec 2025 + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + ruff check --exit-zero ${PY_CHANGED_FILES} + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # Disabled checks, for now + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # pylint ${PY_CHANGED_FILES} + # mypy ${PY_CHANGED_FILES} + + +.PHONY: clean +clean: ##H@@ Clean up build files/cache + rm -rf *.egg-info build dist .coverage + find . \( -name .venv -prune \) \ + -o \( -name __pycache__ -o -name .mypy_cache -o -name .ruff_cache -o -name .pytest_cache \) \ + -exec rm -rf {} + diff --git a/getmyancestors/__init__.py b/getmyancestors/__init__.py index 79d2b16..91a0d77 100644 --- a/getmyancestors/__init__.py +++ b/getmyancestors/__init__.py @@ -1,6 +1,4 @@ # coding: utf-8 -from . import getmyancestors -from . import mergemyancestors -__version__ = "1.0.6" +__version__ = "1.1.2" diff --git a/getmyancestors/__main__.py b/getmyancestors/__main__.py index 3b766b3..af423a0 100644 --- a/getmyancestors/__main__.py +++ b/getmyancestors/__main__.py @@ -1,3 +1,3 @@ -from getmyancestors import getmyancestors +from getmyancestors import getmyanc -getmyancestors.main() +getmyanc.main() diff --git a/getmyancestors/classes/constants.py b/getmyancestors/classes/constants.py index 9b80a64..eba1add 100644 --- a/getmyancestors/classes/constants.py +++ b/getmyancestors/classes/constants.py @@ -4,11 +4,11 @@ MAX_PERSONS = 200 FACT_TAG_EVENT_TYPE = { - 'BIRT': 'Birth', - 'DEAT': 'Death', - 'BURI': 'Burial', - 'CREM': 'Cremation', - 'NATU': 'Naturalization', + "BIRT": "Birth", + "DEAT": "Death", + "BURI": "Burial", + "CREM": "Cremation", + "NATU": "Naturalization", } FACT_TAGS = { @@ -52,8 +52,9 @@ ORDINANCES_STATUS = { "NotNeeded": "INFANT", } + # mergemyancestors constants and functions -def reversed_dict(d): +def reversed_dict(d: dict) -> dict: return {val: key for key, val in d.items()} diff --git a/getmyancestors/classes/gedcom.py b/getmyancestors/classes/gedcom.py index 3dbb10d..1fb45fd 100644 --- a/getmyancestors/classes/gedcom.py +++ b/getmyancestors/classes/gedcom.py @@ -1,15 +1,15 @@ # mergemyancestors classes +from getmyancestors.classes.constants import FACT_TYPES, ORDINANCES from getmyancestors.classes.tree import ( - Indi, Fact, Fam, + Indi, Memorie, Name, Note, Ordinance, Source, ) -from getmyancestors.classes.constants import FACT_TYPES, ORDINANCES class Gedcom: @@ -194,7 +194,7 @@ class Gedcom: if self.tag == "DATE": fact.date = self.__get_text() elif self.tag == "PLAC": - fact.place = self.__get_text() + fact.place = self.tree.ensure_place(self.__get_text()) elif self.tag == "MAP": fact.map = self.__get_map() elif self.tag == "NOTE": diff --git a/getmyancestors/classes/gui.py b/getmyancestors/classes/gui.py index 4b4c7d9..ac67238 100644 --- a/getmyancestors/classes/gui.py +++ b/getmyancestors/classes/gui.py @@ -1,26 +1,19 @@ # fstogedcom classes and functions +import asyncio import os import re -import time -import asyncio import tempfile +import time from threading import Thread +from tkinter import IntVar, Menu, StringVar, TclError, filedialog, messagebox +from tkinter.ttk import Button, Checkbutton, Entry, Frame, Label, Notebook, Treeview + from diskcache import Cache -from tkinter import ( - StringVar, - IntVar, - filedialog, - messagebox, - Menu, - TclError, -) -from tkinter.ttk import Frame, Label, Entry, Button, Checkbutton, Treeview, Notebook - -from getmyancestors.classes.tree import Indi, Fam, Tree from getmyancestors.classes.gedcom import Gedcom from getmyancestors.classes.session import Session from getmyancestors.classes.translation import translations +from getmyancestors.classes.tree import Fam, Indi, Tree tmp_dir = os.path.join(tempfile.gettempdir(), "fstogedcom") cache = Cache(tmp_dir) @@ -258,7 +251,13 @@ class SignIn(Frame): self.save_password = IntVar() self.save_password.set(cache.get("save_password") or 0) - check_save_password = Checkbutton(self, text=_("Save Password"), variable=self.save_password, onvalue=1, offvalue=0) + check_save_password = Checkbutton( + self, + text=_("Save Password"), + variable=self.save_password, + onvalue=1, + offvalue=0, + ) label_username.grid(row=0, column=0, pady=15, padx=(0, 5)) entry_username.grid(row=0, column=1) @@ -512,7 +511,7 @@ class Download(Frame): cache.add("save_password", save_pass) url = "/service/tree/tree-data/reservations/person/%s/ordinances" % self.fs.fid - lds_account = self.fs.get_url(url, {}).get("status") == "OK" + lds_account = self.fs.get_url(url, {}, no_api=True).get("status") == "OK" self.options = Options(self.form, lds_account) self.info("") self.sign_in.destroy() diff --git a/getmyancestors/classes/session.py b/getmyancestors/classes/session.py index 30ed47a..2982661 100644 --- a/getmyancestors/classes/session.py +++ b/getmyancestors/classes/session.py @@ -1,19 +1,23 @@ -# global imports +import contextlib +import json +import os +import sqlite3 import sys import time -from urllib.parse import urlparse, parse_qs +import webbrowser +from urllib.parse import parse_qs, urlparse import requests from requests_cache import CachedSession as CSession -from fake_useragent import UserAgent - from requests_ratelimiter import LimiterAdapter # local imports from getmyancestors.classes.translation import translations +DEFAULT_CLIENT_ID = "a02j000000KTRjpAAH" +DEFAULT_REDIRECT_URI = "https://misbach.github.io/fs-auth/index_raw.html" + -# class Session(requests.Session): class GMASession: """Create a FamilySearch session :param username and password: valid FamilySearch credentials @@ -22,28 +26,126 @@ class GMASession: :param timeout: time before retry a request """ - def __init__(self, username, password, verbose=False, logfile=False, timeout=60): - # super().__init__('http_cache', backend='filesystem', expire_after=86400) - # super().__init__() + def __init__( + self, + username, + password, + client_id=None, + redirect_uri=None, + verbose=False, + logfile=False, + timeout=60, + ): self.username = username self.password = password + self.client_id = client_id or DEFAULT_CLIENT_ID + self.redirect_uri = redirect_uri or DEFAULT_REDIRECT_URI self.verbose = verbose self.logfile = logfile self.timeout = timeout self.fid = self.lang = self.display_name = None self.counter = 0 - self.headers = {"User-Agent": UserAgent().firefox} + + # Persistence setup + os.makedirs("http_cache", exist_ok=True) + self.db_path = "http_cache/requests.sqlite" + self.cookie_file = os.path.expanduser("~/.getmyancestors_cookies.json") + self._init_db() + + # Hardcode robust User-Agent to avoid bot detection + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept-Language": "en-US,en;q=0.9", + } # Apply a rate-limit (5 requests per second) to all requests + # Credit: Josemando Sobral adapter = LimiterAdapter(per_second=5) - self.mount('http://', adapter) - self.mount('https://', adapter) + self.mount("http://", adapter) + self.mount("https://", adapter) self.login() + def _init_db(self): + """Initialize SQLite database for session storage""" + with sqlite3.connect(self.db_path) as conn: + conn.execute( + "CREATE TABLE IF NOT EXISTS session (key TEXT PRIMARY KEY, value TEXT)" + ) + conn.commit() + @property def logged(self): - return bool(self.cookies.get("fssessionid")) + return bool( + self.cookies.get("fssessionid") or self.headers.get("Authorization") + ) + + def save_cookies(self): + """save cookies and authorization header to SQLite""" + try: + data = { + "cookies": requests.utils.dict_from_cookiejar(self.cookies), + "auth": self.headers.get("Authorization"), + } + with sqlite3.connect(self.db_path) as conn: + conn.execute( + "REPLACE INTO session (key, value) VALUES ('current', ?)", + (json.dumps(data),), + ) + conn.commit() + + if self.verbose: + self.write_log("Session saved to SQLite: " + self.db_path) + except Exception as e: + self.write_log("Error saving session: " + str(e)) + + def load_cookies(self): + """load cookies and authorization header from SQLite or migrate from JSON""" + # 1. Try SQLite first + try: + with sqlite3.connect(self.db_path) as conn: + row = conn.execute( + "SELECT value FROM session WHERE key = 'current'" + ).fetchone() + if row: + data = json.loads(row[0]) + self._apply_session_data(data) + if self.verbose: + self.write_log("Session loaded from SQLite") + return True + except Exception as e: + self.write_log("Error loading session from SQLite: " + str(e)) + + # 2. Migration from JSON if exists + if os.path.exists(self.cookie_file): + try: + with open(self.cookie_file, "r", encoding="utf-8") as f: + data = json.load(f) + self._apply_session_data(data) + self.save_cookies() # Save to SQLite + os.rename( + self.cookie_file, self.cookie_file + ".bak" + ) # Backup and disable + if self.verbose: + self.write_log("Migrated session from JSON to SQLite") + return True + except Exception as e: + self.write_log("Error migrating session from JSON: " + str(e)) + + return False + + def _apply_session_data(self, data): + """Internal helper to apply session dict to current session""" + if isinstance(data, dict) and ("cookies" in data or "auth" in data): + cookies_dict = data.get("cookies", {}) + auth_header = data.get("auth") + else: + cookies_dict = data + auth_header = None + + self.cookies.update(requests.utils.cookiejar_from_dict(cookies_dict)) + if auth_header: + self.headers.update({"Authorization": auth_header}) def write_log(self, text): """write text in the log file""" @@ -57,12 +159,40 @@ class GMASession: """retrieve FamilySearch session ID (https://familysearch.org/developers/docs/guides/oauth2) """ - while True: + if self.load_cookies(): + if self.verbose: + self.write_log("Attempting to reuse cached session...") + # Use auto_login=False to prevent recursion if session is invalid + self.set_current(auto_login=False) + if self.logged and self.fid: + if self.verbose: + self.write_log("Successfully reused cached session.") + return + if self.verbose: + self.write_log("Cached session invalid or expired.") + + # Define context manager for disabling cache + if hasattr(self, "cache_disabled"): + cache_context = self.cache_disabled() + else: + cache_context = contextlib.nullcontext() + + with cache_context: try: + if not self.username or not self.password: + return self.manual_login() + + # Clear cookies to ensure fresh start for new login + self.cookies.clear() + url = "https://www.familysearch.org/auth/familysearch/login" self.write_log("Downloading: " + url) - self.get(url, headers=self.headers) - xsrf = self.cookies["XSRF-TOKEN"] + self.get(url, headers=self.headers, timeout=self.timeout) + xsrf = self.cookies.get("XSRF-TOKEN") + if not xsrf: + self.write_log("No XSRF token found. Switching to manual login.") + return self.manual_login() + url = "https://ident.familysearch.org/login" self.write_log("Downloading: " + url) res = self.post( @@ -73,82 +203,225 @@ class GMASession: "password": self.password, }, headers=self.headers, + timeout=self.timeout, ) + try: data = res.json() except ValueError: - self.write_log("Invalid auth request") - self.write_log(res.headers) - self.write_log(res.text) - - raise "Invalid auth request" - # continue - if "loginError" in data: - self.write_log(data["loginError"]) - return + self.write_log(f"Headless Login Failed. Status: {res.status_code}") + self.write_log(f"Response Preview: {res.text[:200]}") + self.write_log("Switching to manual login.") + return self.manual_login() + if "redirectUrl" not in data: - self.write_log(res.text) - continue + self.write_log("Redirect URL not found in response.") + return self.manual_login() url = data["redirectUrl"] self.write_log("Downloading: " + url) - res = self.get(url, headers=self.headers) - res.raise_for_status() + self.get(url, headers=self.headers, timeout=self.timeout) - url = f"https://ident.familysearch.org/cis-web/oauth2/v3/authorization?response_type=code&scope=openid profile email qualifies_for_affiliate_account country&client_id=a02j000000KTRjpAAH&redirect_uri=https://misbach.github.io/fs-auth/index_raw.html&username={self.username}" + url = f"https://ident.familysearch.org/cis-web/oauth2/v3/authorization?response_type=code&scope=openid profile email qualifies_for_affiliate_account country&client_id={self.client_id}&redirect_uri={self.redirect_uri}&username={self.username}" self.write_log("Downloading: " + url) - response = self.get(url, allow_redirects=False, headers=self.headers) - location = response.headers["location"] - code = parse_qs(urlparse(location).query).get("code") + + # Allow redirects so we follow the chain to the callback URI + response = self.get( + url, + allow_redirects=True, + headers=self.headers, + timeout=self.timeout, + ) + + # Check if we landed on the redirect URI (or have the code in the URL) + final_url = response.url + code = None + + if "code=" in final_url: + code = parse_qs(urlparse(final_url).query).get("code") + + # If not in final URL, check history (in case of a meta refresh or stop) + if not code and response.history: + for resp in response.history: + if "code=" in resp.headers.get("Location", ""): + code = parse_qs( + urlparse(resp.headers["Location"]).query + ).get("code") + if code: + break + + if not code: + self.write_log(f"Code not found in URL: {final_url}") + return self.manual_login(response.url) + + if isinstance(code, list): + code = code[0] + + # Use raw requests to avoid cache interference just in case url = "https://ident.familysearch.org/cis-web/oauth2/v3/token" self.write_log("Downloading: " + url) - res = self.post( + res = requests.post( url, data={ "grant_type": "authorization_code", - "client_id": "a02j000000KTRjpAAH", + "client_id": self.client_id, "code": code, - "redirect_uri": "https://misbach.github.io/fs-auth/index_raw.html", + "redirect_uri": self.redirect_uri, }, headers=self.headers, + timeout=self.timeout, ) - try: - data = res.json() - except ValueError: - self.write_log("Invalid auth request") - continue + data = res.json() + if "access_token" in data: + self.headers.update( + {"Authorization": f"Bearer {data['access_token']}"} + ) + self.set_current(auto_login=False) + if self.logged: + self.save_cookies() + return + except Exception as e: + self.write_log("Headless login error: " + str(e)) + return self.manual_login() - if "access_token" not in data: - self.write_log(res.text) - continue - access_token = data["access_token"] - self.headers.update({"Authorization": f"Bearer {access_token}"}) + def manual_login(self, auth_url=None): + """Perform manual login""" + if not auth_url: + auth_url = f"https://ident.familysearch.org/cis-web/oauth2/v3/authorization?response_type=code&scope=openid profile email qualifies_for_affiliate_account country&client_id={self.client_id}&redirect_uri={self.redirect_uri}&username={self.username}" - except requests.exceptions.ReadTimeout: - self.write_log("Read timed out") - continue - except requests.exceptions.ConnectionError: - self.write_log("Connection aborted") - time.sleep(self.timeout) - continue - except requests.exceptions.HTTPError: - self.write_log("HTTPError") - time.sleep(self.timeout) - continue - except KeyError: - self.write_log("KeyError") - time.sleep(self.timeout) - continue - except ValueError: - self.write_log("ValueError") - time.sleep(self.timeout) - continue - if self.logged: - self.set_current() - break + print("\n" + "=" * 60) + print("Headless login failed. Manual login required.") + print("=" * 60) + print(f"Opening browser to login: {auth_url}") + + # Only open browser if we really are in a terminal context, but user asked to stop? + # We will open it because otherwise they can't login. + try: + webbrowser.open(auth_url) + except: + pass + + print("\n" + "-" * 30) + print("MANUAL FALLBACK:") + print("1. Log in to FamilySearch in the opened window.") + print("2. Once logged in, you will be redirected.") + print( + "3. Copy the 'code' from the URL or simply copy the FULL destination URL." + ) + print( + " (If it says 'code already used', assume you need to re-login or check for Access Token)" + ) + print("-" * 30) + + while True: + try: + import getpass - def get_url(self, url, headers=None): + user_input = getpass.getpass( + "Paste the code, token, or full redirect URL here: " + ).strip() + if not user_input: + sys.exit(2) + + code = None + session_id = None + + # Check for Access Token first + if "access_token=" in user_input: + try: + parsed = urlparse(user_input) + if parsed.fragment: + qs = parse_qs(parsed.fragment) + if "access_token" in qs: + session_id = qs["access_token"][0] + if not session_id and parsed.query: + qs = parse_qs(parsed.query) + if "access_token" in qs: + session_id = qs["access_token"][0] + except: + pass + + if ( + not session_id + and len(user_input) > 50 + and "=" not in user_input + and "http" not in user_input + ): + session_id = user_input + + if session_id: + self.headers.update({"Authorization": f"Bearer {session_id}"}) + self.cookies.set( + "fssessionid", session_id, domain=".familysearch.org" + ) + self.set_current(auto_login=False) + if self.logged and self.fid: + self.save_cookies() + print("\nSuccess! Session established via Token.") + return + else: + print("\nToken appeared invalid. Try again.") + continue + + # Check for Code + if "code=" in user_input: + try: + parsed = urlparse(user_input) + qs = parse_qs(parsed.query) + if "code" in qs: + code = qs["code"][0] + except: + pass + elif len(user_input) < 50: + code = user_input + + if code: + url = "https://ident.familysearch.org/cis-web/oauth2/v3/token" + try: + # Raw request to avoid cache + res = requests.post( + url, + data={ + "grant_type": "authorization_code", + "client_id": self.client_id, + "code": code, + "redirect_uri": self.redirect_uri, + }, + headers=self.headers, + timeout=self.timeout, + ) + + data = res.json() + if "access_token" in data: + session_id = data["access_token"] + self.headers.update( + {"Authorization": f"Bearer {session_id}"} + ) + self.cookies.set( + "fssessionid", session_id, domain=".familysearch.org" + ) + self.set_current(auto_login=False) + if self.logged and self.fid: + self.save_cookies() + print("\nSuccess! Session established via Code.") + return + + error_desc = data.get( + "error_description", data.get("error", "Unknown error") + ) + print(f"\nToken exchange failed: {error_desc}") + + except Exception as e: + print(f"\nError during token exchange: {e}") + + print("Invalid input or failed login. Please try again.") + + except (EOFError, KeyboardInterrupt): + print("\nLogin cancelled.") + sys.exit(2) + + def get_url(self, url, headers=None, auto_login=True): """retrieve JSON structure from a FamilySearch URL""" self.counter += 1 if headers is None: @@ -157,6 +430,7 @@ class GMASession: while True: try: self.write_log("Downloading: " + url) + # Used HEAD logic here (explicit API URL) r = self.get( "https://api.familysearch.org" + url, timeout=self.timeout, @@ -170,14 +444,19 @@ class GMASession: time.sleep(self.timeout) continue self.write_log("Status code: %s" % r.status_code) + if self.verbose and hasattr(r, "from_cache") and r.from_cache: + self.write_log("CACHE HIT: " + url) if r.status_code == 204: return None if r.status_code in {404, 405, 410, 500}: self.write_log("WARNING: " + url) return None if r.status_code == 401: - self.login() - continue + if auto_login: + self.login() + continue + else: + return None try: r.raise_for_status() except requests.exceptions.HTTPError: @@ -206,31 +485,78 @@ class GMASession: self.write_log("WARNING: corrupted file from %s, error: %s" % (url, e)) return None - def set_current(self): + def set_current(self, auto_login=True): """retrieve FamilySearch current user ID, name and language""" url = "/platform/users/current" - data = self.get_url(url) + data = self.get_url(url, auto_login=auto_login) if data: self.fid = data["users"][0]["personId"] self.lang = data["users"][0]["preferredLanguage"] self.display_name = data["users"][0]["displayName"] def _(self, string): - """translate a string into user's language - TODO replace translation file for gettext format - """ + """translate a string into user's language""" if string in translations and self.lang in translations[string]: return translations[string][self.lang] return string class CachedSession(GMASession, CSession): + def __init__( + self, + username, + password, + client_id=None, + redirect_uri=None, + verbose=False, + logfile=False, + timeout=60, + cache_control=True, + ): + # Persistence setup + os.makedirs("http_cache", exist_ok=True) + # Use SQLite backend as per requirement + CSession.__init__( + self, + "http_cache/requests", + backend="sqlite", + expire_after=86400, + allowable_codes=(200, 204), + table_name="responses", + cache_control=cache_control, # Enable HTTP conditional requests (ETag/Last-Modified) + ) + GMASession.__init__( + self, + username, + password, + client_id, + redirect_uri, + verbose=verbose, + logfile=logfile, + timeout=timeout, + ) - def __init__(self, username, password, verbose=False, logfile=False, timeout=60): - CSession.__init__(self, 'http_cache', backend='filesystem', expire_after=86400) - GMASession.__init__(self, username, password, verbose=verbose, logfile=logfile, timeout=timeout) -class Session(GMASession, requests.Session): - def __init__(self, username, password, verbose=False, logfile=False, timeout=60): +class Session(GMASession, requests.Session): + def __init__( + self, + username, + password, + client_id=None, + redirect_uri=None, + verbose=False, + logfile=False, + timeout=60, + cache_control=True, # Ignored for non-cached sessions + ): requests.Session.__init__(self) - GMASession.__init__(self, username, password, verbose=verbose, logfile=logfile, timeout=timeout) + GMASession.__init__( + self, + username, + password, + client_id, + redirect_uri, + verbose=verbose, + logfile=logfile, + timeout=timeout, + ) diff --git a/getmyancestors/classes/translation.py b/getmyancestors/classes/translation.py index 06532ba..d125eea 100644 --- a/getmyancestors/classes/translation.py +++ b/getmyancestors/classes/translation.py @@ -118,16 +118,16 @@ translations = { "Cut": {"fr": "Couper"}, "Paste": {"fr": "Coller"}, "Username:": { - "fr": "Nom d'utilisateur :", - "de": "Benutzername:", + "fr": "Nom d'utilisateur :", + "de": "Benutzername:", }, "Password:": { - "fr": "Mot de passe :", - "de": "Passwort:", + "fr": "Mot de passe :", + "de": "Passwort:", }, "Save Password": { - "fr": "Enregistrer le mot de passe", - "de": "Passwort speichern", + "fr": "Enregistrer le mot de passe", + "de": "Passwort speichern", }, "ID already exist": {"fr": "Cet identifiant existe déjà"}, "Invalid FamilySearch ID: ": {"fr": "Identifiant FamilySearch invalide : "}, diff --git a/getmyancestors/classes/tree.py b/getmyancestors/classes/tree.py index 9091150..3ca740e 100644 --- a/getmyancestors/classes/tree.py +++ b/getmyancestors/classes/tree.py @@ -1,75 +1,74 @@ -import sys -import re -import time import asyncio import os -from urllib.parse import unquote, unquote_plus +import re +import sys +import time +import xml.etree.cElementTree as ET from datetime import datetime -from typing import Set, Dict, List, Tuple, Union, Optional, BinaryIO, Any +from typing import Any, BinaryIO, Dict, List, Optional, Set, Tuple +from urllib.parse import unquote, unquote_plus +from xml.etree.cElementTree import Element + # global imports import babelfish import geocoder import requests -import xml.etree.cElementTree as ET -from xml.etree.cElementTree import Element from requests_cache import CachedSession # local imports -import getmyancestors +from getmyancestors import __version__ from getmyancestors.classes.constants import ( - MAX_PERSONS, FACT_EVEN, FACT_TAGS, + MAX_PERSONS, ORDINANCES_STATUS, ) - -COUNTY = 'County' -COUNTRY = 'Country' -CITY = 'City' +COUNTY = "County" +COUNTRY = "Country" +CITY = "City" GEONAME_FEATURE_MAP = { - 'ADM1': COUNTY, # first-order administrative division a primary administrative division of a country, such as a state in the United States - 'ADM1H': COUNTY, # historical first-order administrative division a former first-order administrative division - 'ADM2': COUNTY, # second-order administrative division a subdivision of a first-order administrative division - 'ADM2H': COUNTY, # historical second-order administrative division a former second-order administrative division - 'ADM3': COUNTY, # third-order administrative division a subdivision of a second-order administrative division - 'ADM3H': COUNTY, # historical third-order administrative division a former third-order administrative division - 'ADM4': COUNTY, # fourth-order administrative division a subdivision of a third-order administrative division - 'ADM4H': COUNTY, # historical fourth-order administrative division a former fourth-order administrative division - 'ADM5': COUNTY, # fifth-order administrative division a subdivision of a fourth-order administrative division - 'ADM5H': COUNTY, # historical fifth-order administrative division a former fifth-order administrative division - 'ADMD': COUNTY, # administrative division an administrative division of a country, undifferentiated as to administrative level - 'ADMDH': COUNTY, # historical administrative division a former administrative division of a political entity, undifferentiated as to administrative level + "ADM1": COUNTY, # first-order administrative division a primary administrative division of a country, such as a state in the United States + "ADM1H": COUNTY, # historical first-order administrative division a former first-order administrative division + "ADM2": COUNTY, # second-order administrative division a subdivision of a first-order administrative division + "ADM2H": COUNTY, # historical second-order administrative division a former second-order administrative division + "ADM3": COUNTY, # third-order administrative division a subdivision of a second-order administrative division + "ADM3H": COUNTY, # historical third-order administrative division a former third-order administrative division + "ADM4": COUNTY, # fourth-order administrative division a subdivision of a third-order administrative division + "ADM4H": COUNTY, # historical fourth-order administrative division a former fourth-order administrative division + "ADM5": COUNTY, # fifth-order administrative division a subdivision of a fourth-order administrative division + "ADM5H": COUNTY, # historical fifth-order administrative division a former fifth-order administrative division + "ADMD": COUNTY, # administrative division an administrative division of a country, undifferentiated as to administrative level + "ADMDH": COUNTY, # historical administrative division a former administrative division of a political entity, undifferentiated as to administrative level # 'LTER': leased area a tract of land leased to another country, usually for military installations - 'PCL': COUNTRY, # political entity - 'PCLD': COUNTRY, # dependent political entity - 'PCLF': COUNTRY, # freely associated state - 'PCLH': COUNTRY, # historical political entity a former political entity - 'PCLI': COUNTRY, # independent political entity - 'PCLIX': COUNTRY, # section of independent political entity - 'PCLS': COUNTRY, # semi-independent political entity - - 'PPL': CITY, # populated place a city, town, village, or other agglomeration of buildings where people live and work - 'PPLA': CITY, # seat of a first-order administrative division seat of a first-order administrative division (PPLC takes precedence over PPLA) - 'PPLA2': CITY, # seat of a second-order administrative division - 'PPLA3': CITY, # seat of a third-order administrative division - 'PPLA4': CITY, # seat of a fourth-order administrative division - 'PPLA5': CITY, # seat of a fifth-order administrative division - 'PPLC': CITY, # capital of a political entity - 'PPLCH': CITY, # historical capital of a political entity a former capital of a political entity - 'PPLF': CITY, # farm village a populated place where the population is largely engaged in agricultural activities - 'PPLG': CITY, # seat of government of a political entity - 'PPLH': CITY, # historical populated place a populated place that no longer exists - 'PPLL': CITY, # populated locality an area similar to a locality but with a small group of dwellings or other buildings - 'PPLQ': CITY, # abandoned populated place - 'PPLR': CITY, # religious populated place a populated place whose population is largely engaged in religious occupations - 'PPLS': CITY, # populated places cities, towns, villages, or other agglomerations of buildings where people live and work - 'PPLW': CITY, # destroyed populated place a village, town or city destroyed by a natural disaster, or by war - 'PPLX': CITY, # section of populated place - + "PCL": COUNTRY, # political entity + "PCLD": COUNTRY, # dependent political entity + "PCLF": COUNTRY, # freely associated state + "PCLH": COUNTRY, # historical political entity a former political entity + "PCLI": COUNTRY, # independent political entity + "PCLIX": COUNTRY, # section of independent political entity + "PCLS": COUNTRY, # semi-independent political entity + "PPL": CITY, # populated place a city, town, village, or other agglomeration of buildings where people live and work + "PPLA": CITY, # seat of a first-order administrative division seat of a first-order administrative division (PPLC takes precedence over PPLA) + "PPLA2": CITY, # seat of a second-order administrative division + "PPLA3": CITY, # seat of a third-order administrative division + "PPLA4": CITY, # seat of a fourth-order administrative division + "PPLA5": CITY, # seat of a fifth-order administrative division + "PPLC": CITY, # capital of a political entity + "PPLCH": CITY, # historical capital of a political entity a former capital of a political entity + "PPLF": CITY, # farm village a populated place where the population is largely engaged in agricultural activities + "PPLG": CITY, # seat of government of a political entity + "PPLH": CITY, # historical populated place a populated place that no longer exists + "PPLL": CITY, # populated locality an area similar to a locality but with a small group of dwellings or other buildings + "PPLQ": CITY, # abandoned populated place + "PPLR": CITY, # religious populated place a populated place whose population is largely engaged in religious occupations + "PPLS": CITY, # populated places cities, towns, villages, or other agglomerations of buildings where people live and work + "PPLW": CITY, # destroyed populated place a village, town or city destroyed by a natural disaster, or by war + "PPLX": CITY, # section of populated place } + # getmyancestors classes and functions def cont(string): """parse a GEDCOM line adding CONT and CONT tags if necessary""" @@ -95,6 +94,7 @@ def cont(string): max_len = 248 return ("\n%s CONT " % level).join(res) + "\n" + class Note: """GEDCOM Note class :param text: the Note content @@ -106,51 +106,59 @@ class Note: def __init__(self, text="", tree=None, num=None, num_prefix=None, note_type=None): self._handle = None - self.note_type = note_type or 'Source Note' + self.note_type = note_type or "Source Note" self.num_prefix = num_prefix if num: self.num = num else: - Note.counter[num_prefix or 'None'] = Note.counter.get(num_prefix or 'None', 0) + 1 - self.num = Note.counter[num_prefix or 'None'] - print(f'##### Creating Note: {num_prefix}, {self.num}', file=sys.stderr) + Note.counter[num_prefix or "None"] = ( + Note.counter.get(num_prefix or "None", 0) + 1 + ) + self.num = Note.counter[num_prefix or "None"] + print(f"##### Creating Note: {num_prefix}, {self.num}", file=sys.stderr) self.text = text.strip() if tree: tree.notes.append(self) + def __str__(self): + """Return readable string for debugging/reference purposes.""" + return f"{self.num}. {self.text}" + @property def id(self): - return f'{self.num_prefix}_{self.num}' if self.num_prefix != None else self.num + return ( + f"{self.num_prefix}_{self.num}" if self.num_prefix is not None else self.num + ) def print(self, file=sys.stdout): """print Note in GEDCOM format""" - print(f'Note: {self.text}', file=sys.stderr) + print(f"Note: {self.text}", file=sys.stderr) file.write(cont("0 @N%s@ NOTE %s" % (self.id, self.text))) def link(self, file=sys.stdout, level=1): """print the reference in GEDCOM format""" - print(f'Linking Note: {self.id}', file=sys.stderr) + print(f"Linking Note: {self.id}", file=sys.stderr) file.write("%s NOTE @N%s@\n" % (level, self.id)) - @property def handle(self): if not self._handle: - self._handle = '_' + os.urandom(10).hex() + self._handle = "_" + os.urandom(10).hex() return self._handle def printxml(self, parent_element: Element) -> None: note_element = ET.SubElement( parent_element, - 'note', + "note", handle=self.handle, - # change='1720382308', - id=self.id, - type='Source Note' + # change='1720382308', + id=self.id, + type="Source Note", ) - ET.SubElement(note_element, 'text').text = self.text + ET.SubElement(note_element, "text").text = self.text + class Source: """GEDCOM Source class @@ -185,25 +193,30 @@ class Source: if "titles" in data: self.title = data["titles"][0]["value"] if "notes" in data: - notes = [ n['text'] for n in data["notes"] if n["text"] ] + notes = [n["text"] for n in data["notes"] if n["text"]] for idx, n in enumerate(notes): - self.notes.add(Note( - n, - self.tree, - num="S%s-%s" % (self.id, idx), - note_type='Source Note' - )) - self.modified = data['attribution']['modified'] + self.notes.add( + Note( + n, + self.tree, + num="S%s-%s" % (self.id, idx), + note_type="Source Note", + ) + ) + self.modified = data["attribution"]["modified"] + + def __str__(self): + """Return readable string for debugging/reference purposes.""" + return f"{self.num}. {self.title}" @property def id(self): - return 'S' + str(self.fid or self.num) - + return "S" + str(self.fid or self.num) @property def handle(self): if not self._handle: - self._handle = '_' + os.urandom(10).hex() + self._handle = "_" + os.urandom(10).hex() return self._handle @@ -225,28 +238,27 @@ class Source: file.write("%s SOUR @S%s@\n" % (level, self.id)) def printxml(self, parent_element: Element) -> None: - - # - # Palkovics Cser József, "Hungary Civil Registration, 1895-1980" - # "Hungary Civil Registration, 1895-1980", , <i>FamilySearch</i> (https://www.familysearch.org/ark:/61903/1:1:6JBQ-NKWD : Thu Mar 07 10:23:43 UTC 2024), Entry for Palkovics Cser József and Palkovics Cser István, 27 Aug 1928. - # https://familysearch.org/ark:/61903/1:1:6JBQ-NKWD - # - # + # + # Palkovics Cser József, "Hungary Civil Registration, 1895-1980" + # "Hungary Civil Registration, 1895-1980", , <i>FamilySearch</i> (https://www.familysearch.org/ark:/61903/1:1:6JBQ-NKWD : Thu Mar 07 10:23:43 UTC 2024), Entry for Palkovics Cser József and Palkovics Cser István, 27 Aug 1928. + # https://familysearch.org/ark:/61903/1:1:6JBQ-NKWD + # + # source_element = ET.SubElement( parent_element, - 'source', + "source", handle=self.handle, change=str(int(self.modified / 1000)), - id=self.id + id=self.id, ) if self.title: - ET.SubElement(source_element, 'stitle').text = self.title + ET.SubElement(source_element, "stitle").text = self.title if self.citation: - ET.SubElement(source_element, 'sauthor').text = self.citation + ET.SubElement(source_element, "sauthor").text = self.citation if self.url: - ET.SubElement(source_element, 'spubinfo').text = self.url + ET.SubElement(source_element, "spubinfo").text = self.url if self.fid: - ET.SubElement(source_element, 'srcattribute', type='REFN', value=self.fid) + ET.SubElement(source_element, "srcattribute", type="REFN", value=self.fid) class Fact: @@ -257,11 +269,12 @@ class Fact: counter = {} - def __init__(self, data=None, tree: Optional['Tree']=None, num_prefix=None): + def __init__(self, data=None, tree: Optional["Tree"] = None, num_prefix=None): self.value = self.type = self.date = None self.date_type = None self.place: Optional[Place] = None self.note = None + self.map = None self._handle: Optional[str] = None if data: if "value" in data: @@ -276,33 +289,43 @@ class Fact: elif self.type not in FACT_TAGS: self.type = None - - self.num_prefix = f'{num_prefix}_{FACT_TAGS[self.type]}' if num_prefix and self.type in FACT_TAGS else num_prefix - Fact.counter[self.num_prefix or 'None'] = Fact.counter.get(self.num_prefix or 'None', 0) + 1 - self.num = Fact.counter[self.num_prefix or 'None'] + self.num_prefix = ( + f"{num_prefix}_{FACT_TAGS[self.type]}" + if num_prefix and self.type in FACT_TAGS + else num_prefix + ) + Fact.counter[self.num_prefix or "None"] = ( + Fact.counter.get(self.num_prefix or "None", 0) + 1 + ) + self.num = Fact.counter[self.num_prefix or "None"] if data: if "date" in data: - if 'formal' in data['date']: - self.date = data['date']['formal'].split('+')[-1].split('/')[0] - if data['date']['formal'].startswith('A+'): - self.date_type = 'about' - if data['date']['formal'].startswith('/+'): - self.date_type = 'before' - if data['date']['formal'].endswith('/'): - self.date_type = 'after' + if "formal" in data["date"]: + self.date = data["date"]["formal"].split("+")[-1].split("/")[0] + if data["date"]["formal"].startswith("A+"): + self.date_type = "about" + if data["date"]["formal"].startswith("/+"): + self.date_type = "before" + if data["date"]["formal"].endswith("/"): + self.date_type = "after" else: self.date = data["date"]["original"] if "place" in data: place = data["place"] place_name = place["original"] - place_id = place["description"][1:] if "description" in place and place["description"][1:] in tree.places else None + place_id = ( + place["description"][1:] + if "description" in place + and place["description"][1:] in tree.places + else None + ) self.place = tree.ensure_place(place_name, place_id) if "changeMessage" in data["attribution"]: self.note = Note( - data["attribution"]["changeMessage"], + data["attribution"]["changeMessage"], tree, - num_prefix='E' + self.num_prefix if self.num_prefix else None, - note_type='Event Note', + num_prefix="E" + self.num_prefix if self.num_prefix else None, + note_type="Event Note", ) if self.type == "http://gedcomx.org/Death" and not ( self.date or self.place @@ -311,47 +334,46 @@ class Fact: if tree: tree.facts.add(self) - @property def id(self): - return f'{self.num_prefix}_{self.num}' if self.num_prefix != None else self.num - + return ( + f"{self.num_prefix}_{self.num}" if self.num_prefix is not None else self.num + ) @property def handle(self): if not self._handle: - self._handle = '_' + os.urandom(10).hex() + self._handle = "_" + os.urandom(10).hex() return self._handle def printxml(self, parent_element): - event_element = ET.SubElement( parent_element, - 'event', + "event", handle=self.handle, # change='1720382301', - id=self.id + id=self.id, ) - ET.SubElement(event_element, 'type').text = ( - unquote_plus(self.type[len('http://gedcomx.org/'):]) - if self.type.startswith('http://gedcomx.org/') + ET.SubElement(event_element, "type").text = ( + unquote_plus(self.type[len("http://gedcomx.org/") :]) + if self.type.startswith("http://gedcomx.org/") else self.type ) # FACT_TAGS.get(self.type, self.type) if self.date: - params={ - 'val': self.date, + params = { + "val": self.date, } if self.date_type is not None: - params['type'] = self.date_type - ET.SubElement(event_element, 'datestr', **params) + params["type"] = self.date_type + ET.SubElement(event_element, "datestr", **params) if self.place: - ET.SubElement(event_element, 'place', hlink=self.place.handle) + ET.SubElement(event_element, "place", hlink=self.place.handle) if self.note: - ET.SubElement(event_element, 'noteref', hlink=self.note.handle) + ET.SubElement(event_element, "noteref", hlink=self.note.handle) def print(self, file=sys.stdout): """print Fact in GEDCOM format @@ -405,20 +427,23 @@ class Memorie: NAME_MAP = { - "preferred" : 'Preeferred Name', - "nickname" : 'Nickname', - "birthname": 'Birth Name', - "aka": 'Also Known As', - "married": 'Married Name', + "preferred": "Preeferred Name", + "nickname": "Nickname", + "birthname": "Birth Name", + "aka": "Also Known As", + "married": "Married Name", } + class Name: """GEDCOM Name class :param data: FS Name data :param tree: a Tree object """ - def __init__(self, data=None, tree=None, owner_fis=None, kind=None, alternative: bool=False): + def __init__( + self, data=None, tree=None, owner_fis=None, kind=None, alternative: bool = False + ): self.given = "" self.surname = "" self.prefix = None @@ -442,22 +467,24 @@ class Name: self.note = Note( data["attribution"]["changeMessage"], tree, - num_prefix=f'NAME_{owner_fis}_{kind}', - note_type='Name Note', + note_type="Name Note", ) + def __str__(self): + """Return readable string for debugging/reference purposes.""" + return f"{self.given} {self.surname}" + def printxml(self, parent_element): params = {} if self.kind is not None: - params['type'] = NAME_MAP.get(self.kind, self.kind) + params["type"] = NAME_MAP.get(self.kind, self.kind) if self.alternative: - params['alt'] = '1' - person_name = ET.SubElement(parent_element, 'name', **params) - ET.SubElement(person_name, 'first').text = self.given - ET.SubElement(person_name, 'surname').text = self.surname + params["alt"] = "1" + person_name = ET.SubElement(parent_element, "name", **params) + ET.SubElement(person_name, "first").text = self.given + ET.SubElement(person_name, "surname").text = self.surname # TODO prefix / suffix - def print(self, file=sys.stdout, typ=None): """print Name in GEDCOM format :param typ: type for additional names @@ -474,7 +501,6 @@ class Name: self.note.link(file, 2) - class Place: """GEDCOM Place class :param name: the place name @@ -485,13 +511,14 @@ class Place: counter = 0 def __init__( - self, - id: str, - name: str, - type: Optional[str]=None, - parent: Optional['Place']=None, - latitude: Optional[float]=None, - longitude: Optional[float]=None): + self, + id: str, + name: str, + type: Optional[str] = None, + parent: Optional["Place"] = None, + latitude: Optional[float] = None, + longitude: Optional[float] = None, + ): self._handle = None self.name = name self.type = type @@ -503,39 +530,39 @@ class Place: @property def handle(self): if not self._handle: - self._handle = '_' + os.urandom(10).hex() + self._handle = "_" + os.urandom(10).hex() return self._handle - def print(self, file=sys.stdout, indentation=0): """print Place in GEDCOM format""" - file.write("%d @P%s@ PLAC %s\n" % (indentation, self.num, self.name)) + file.write("%d @P%s@ PLAC %s\n" % (indentation, self.id, self.name)) def printxml(self, parent_element): - - - # - # - # - # - # - # - # + # + # + # + # + # + # + # place_element = ET.SubElement( - parent_element, - 'placeobj', + parent_element, + "placeobj", handle=self.handle, # change='1720382307', id=self.id, - type=self.type or 'Unknown' + type=self.type or "Unknown", ) # ET.SubElement(place_element, 'ptitle').text = self.name - ET.SubElement(place_element, 'pname', value=self.name) + ET.SubElement(place_element, "pname", value=self.name) if self.parent: - ET.SubElement(place_element, 'placeref', hlink=self.parent.handle) + ET.SubElement(place_element, "placeref", hlink=self.parent.handle) if self.latitude and self.longitude: - ET.SubElement(place_element, 'coord', long=str(self.longitude), lat=str(self.latitude)) + ET.SubElement( + place_element, "coord", long=str(self.longitude), lat=str(self.latitude) + ) + class Ordinance: """GEDCOM Ordinance class @@ -562,8 +589,8 @@ class Ordinance: if self.famc: file.write("2 FAMC @F%s@\n" % self.famc.num) -class Citation: +class Citation: def __init__(self, data: Dict[str, Any], source: Source): self._handle = None self.id = data["id"] @@ -574,33 +601,31 @@ class Citation: else None ) # TODO create citation note out of this. - self.modified = data['attribution']['modified'] + self.modified = data["attribution"]["modified"] - @property def handle(self): if not self._handle: - self._handle = '_' + os.urandom(10).hex() + self._handle = "_" + os.urandom(10).hex() return self._handle def printxml(self, parent_element: Element): - -# -# -# 2 -# -# -# + # + # + # 2 + # + # + # citation_element = ET.SubElement( parent_element, - 'citation', + "citation", handle=self.handle, change=str(int(self.modified / 1000)), - id='C' + str(self.id) + id="C" + str(self.id), ) - ET.SubElement(citation_element, 'confidence').text = '2' - ET.SubElement(citation_element, 'sourceref', hlink=self.source.handle) + ET.SubElement(citation_element, "confidence").text = "2" + ET.SubElement(citation_element, "sourceref", hlink=self.source.handle) class Indi: @@ -612,7 +637,7 @@ class Indi: counter = 0 - def __init__(self, fid: str, tree: 'Tree', num=None): + def __init__(self, fid: Optional[str] = None, tree: "Tree" = None, num=None): self._handle = None if num: self.num = num @@ -621,20 +646,22 @@ class Indi: self.num = Indi.counter self.fid = fid self.tree = tree - self.famc: Set['Fam'] = set() - self.fams: Set['Fam'] = set() - # self.famc_fid = set() - # self.fams_fid = set() - # self.famc_num = set() - # self.fams_num = set() - # self.famc_ids = set() - # self.fams_ids = set() + self.famc: Set["Fam"] = set() + self.fams: Set["Fam"] = set() + self.famc_fid = set() + self.fams_fid = set() + self.famc_num = set() + self.fams_num = set() + self.famc_ids = set() + self.fams_ids = set() self.name: Optional[Name] = None self.gender = None self.living = None - self.parents: Set[Tuple[str, str]] = set() # (father_id, mother_id) - self.spouses: Set[Tuple[str, str, str]] = set() # (person1, person2, relfid) - self.children: Set[Tuple[str, str, str]] = set() # (father_id, mother_id, child_id) + self.parents: Set[Tuple[str, str]] = set() # (father_id, mother_id) + self.spouses: Set[Tuple[str, str, str]] = set() # (person1, person2, relfid) + self.children: Set[Tuple[str, str, str]] = ( + set() + ) # (father_id, mother_id, child_id) self.baptism = self.confirmation = self.initiatory = None self.endowment = self.sealing_child = None self.nicknames: Set[Name] = set() @@ -643,16 +670,20 @@ class Indi: self.aka: Set[Name] = set() self.facts: Set[Fact] = set() self.notes: Set[Note] = set() - # self.sources: Set[Source] = set() + self.sources = set() self.citations: Set[Citation] = set() self.memories = set() + def __str__(self): + """Return readable string for debugging/reference purposes.""" + return f"{self.num}. {self.name}, fam: {self.fid}" + def add_data(self, data): """add FS individual data""" if data: self.living = data["living"] for x in data["names"]: - alt = not x.get('preferred', False) + alt = not x.get("preferred", False) if x["type"] == "http://gedcomx.org/Nickname": self.nicknames.add(Name(x, self.tree, self.fid, "nickname", alt)) elif x["type"] == "http://gedcomx.org/BirthName": @@ -662,8 +693,8 @@ class Indi: elif x["type"] == "http://gedcomx.org/MarriedName": self.married.add(Name(x, self.tree, self.fid, "married", alt)) else: - print('Unknown name type: ' + x.get('type'), file=sys.stderr) - raise 'Unknown name type' + print("Unknown name type: " + x.get("type"), file=sys.stderr) + raise "Unknown name type" if "gender" in data: if data["gender"]["type"] == "http://gedcomx.org/Male": self.gender = "M" @@ -679,28 +710,34 @@ class Indi: "=== %s ===\n%s" % (self.tree.fs._("Life Sketch"), x.get("value", "")), self.tree, - num_prefix=f'INDI_{self.fid}', - note_type='Person Note', + num_prefix=f"INDI_{self.fid}", + note_type="Person Note", ) ) else: - self.facts.add(Fact(x, self.tree, num_prefix=f'INDI_{self.fid}')) + self.facts.add( + Fact(x, self.tree, num_prefix=f"INDI_{self.fid}") + ) if "sources" in data: sources = self.tree.fs.get_url( "/platform/tree/persons/%s/sources" % self.fid ) if sources: - quotes = dict() for quote in sources["persons"][0]["sources"]: source_id = quote["descriptionId"] source_data = next( - (s for s in sources['sourceDescriptions'] if s['id'] == source_id), + ( + s + for s in sources["sourceDescriptions"] + if s["id"] == source_id + ), None, ) source = self.tree.ensure_source(source_data) if source: citation = self.tree.ensure_citation(quote, source) self.citations.add(citation) + self.sources.add((source, citation.message)) for evidence in data.get("evidence", []): memory_id, *_ = evidence["id"].partition("-") @@ -718,23 +755,24 @@ class Indi: Note( text, self.tree, - num_prefix=f'INDI_{self.fid}', - note_type='Person Note', - )) + num_prefix=f"INDI_{self.fid}", + note_type="Person Note", + ) + ) else: self.memories.add(Memorie(x)) - def add_fams(self, fam: 'Fam'): + def add_fams(self, fam: "Fam"): """add family fid (for spouse or parent)""" self.fams.add(fam) - def add_famc(self, fam: 'Fam'): + def add_famc(self, fam: "Fam"): """add family fid (for child)""" self.famc.add(fam) def get_notes(self): """retrieve individual notes""" - print(f'Getting Notes for {self.fid}', file=sys.stderr) + print(f"Getting Notes for {self.fid}", file=sys.stderr) notes = self.tree.fs.get_url("/platform/tree/persons/%s/notes" % self.fid) if notes: for n in notes["persons"][0]["notes"]: @@ -744,9 +782,10 @@ class Indi: Note( text_note, self.tree, - num_prefix=f'INDI_{self.fid}', - note_type='Person Note', - )) + num_prefix=f"INDI_{self.fid}", + note_type="Person Note", + ) + ) def get_ordinances(self): """retrieve LDS ordinances @@ -757,7 +796,7 @@ class Indi: if self.living: return res, famc url = "/service/tree/tree-data/reservations/person/%s/ordinances" % self.fid - data = self.tree.fs.get_url(url, {}) + data = self.tree.fs.get_url(url, {}, no_api=True) if data: for key, o in data["data"].items(): if key == "baptism": @@ -798,22 +837,27 @@ class Indi: if n.text == text: self.notes.add(n) return - self.notes.add(Note(text, self.tree, num_prefix=f'INDI_{self.fid}_CONTRIB', note_type='Contribution Note')) + self.notes.add( + Note( + text, + self.tree, + num_prefix=f"INDI_{self.fid}_CONTRIB", + note_type="Contribution Note", + ) + ) @property def id(self): return self.fid or self.num - @property def handle(self): if not self._handle: - self._handle = '_' + os.urandom(10).hex() + self._handle = "_" + os.urandom(10).hex() return self._handle def printxml(self, parent_element): - # # M # @@ -827,47 +871,46 @@ class Indi: # # # - person = ET.SubElement(parent_element, - 'person', - handle=self.handle, - # change='1720382301', - id='I' + str(self.id)) + person = ET.SubElement( + parent_element, + "person", + handle=self.handle, + # change='1720382301', + id="I" + str(self.id), + ) if self.fid: - ET.SubElement(person, 'attribute', type='_FSFTID', value=self.fid) + ET.SubElement(person, "attribute", type="_FSFTID", value=self.fid) if self.name: self.name.printxml(person) for name in self.nicknames | self.birthnames | self.aka | self.married: name.printxml(person) - - gender = ET.SubElement(person, 'gender') + + gender = ET.SubElement(person, "gender") gender.text = self.gender - + if self.fams: for fam in self.fams: - ET.SubElement(person, 'parentin', hlink=fam.handle) + ET.SubElement(person, "parentin", hlink=fam.handle) if self.famc: for fam in self.famc: - ET.SubElement(person, 'childof', hlink=fam.handle) + ET.SubElement(person, "childof", hlink=fam.handle) + ET.SubElement(person, "attribute", type="_FSFTID", value=self.fid) - ET.SubElement(person, 'attribute', type="_FSFTID", value=self.fid) - - for fact in self.facts: - ET.SubElement(person, 'eventref', hlink=fact.handle, role='Primary') + ET.SubElement(person, "eventref", hlink=fact.handle, role="Primary") for citation in self.citations: - ET.SubElement(person, 'citationref', hlink=citation.handle) + ET.SubElement(person, "citationref", hlink=citation.handle) for note in self.notes: - ET.SubElement(person, 'noteref', hlink=note.handle) + ET.SubElement(person, "noteref", hlink=note.handle) # # - def print(self, file=sys.stdout): """print individual in GEDCOM format""" file.write("0 @I%s@ INDI\n" % self.id) @@ -910,7 +953,7 @@ class Indi: # for num in self.fams_ids: # print(f'Famc Ids: {self.famc_ids}', file=sys.stderr) # for num in self.famc_ids: - # file.write("1 FAMC @F%s@\n" % num) + # file.write("1 FAMC @F%s@\n" % num) file.write("1 _FSFTID %s\n" % self.fid) for o in self.notes: o.link(file) @@ -930,9 +973,15 @@ class Fam: counter = 0 - def __init__(self, husband: Indi | None, wife: Indi | None, tree: 'Tree'): + def __init__( + self, + husband: Indi | None = None, + wife: Indi | None = None, + tree: "Tree" = None, + num=None, + ): self._handle = None - self.num = Fam.gen_id(husband, wife) + self.num = num if num else Fam.gen_id(husband, wife) self.fid = None self.husband = husband self.wife = wife @@ -940,27 +989,33 @@ class Fam: self.children: Set[Indi] = set() self.facts: Set[Fact] = set() self.sealing_spouse = None + self.husb_num = None + self.wife_num = None + self.chil_num = set() + self.husb_fid = None + self.wife_fid = None + self.chil_fid = set() self.notes = set() self.sources = set() @property def handle(self): if not self._handle: - self._handle = '_' + os.urandom(10).hex() + self._handle = "_" + os.urandom(10).hex() return self._handle - + @staticmethod def gen_id(husband: Indi | None, wife: Indi | None) -> str: if husband and wife: - return f'FAM_{husband.id}-{wife.id}' + return f"FAM_{husband.id}-{wife.id}" elif husband: - return f'FAM_{husband.id}-UNK' + return f"FAM_{husband.id}-UNK" elif wife: - return f'FAM_UNK-{wife.id}' + return f"FAM_UNK-{wife.id}" else: Fam.counter += 1 - return f'FAM_UNK-UNK-{Fam.counter}' + return f"FAM_UNK-UNK-{Fam.counter}" def add_child(self, child: Indi | None): """add a child fid to the family""" @@ -978,7 +1033,7 @@ class Fam: if data: if "facts" in data["relationships"][0]: for x in data["relationships"][0]["facts"]: - self.facts.add(Fact(x, self.tree, num_prefix=f'FAM_{self.fid}')) + self.facts.add(Fact(x, self.tree, num_prefix=f"FAM_{self.fid}")) if "sources" in data["relationships"][0]: quotes = dict() for x in data["relationships"][0]["sources"]: @@ -1015,7 +1070,14 @@ class Fam: for n in notes["relationships"][0]["notes"]: text_note = "=== %s ===\n" % n["subject"] if "subject" in n else "" text_note += n["text"] + "\n" if "text" in n else "" - self.notes.add(Note(text_note, self.tree, num_prefix=f'FAM_{self.fid}', note_type='Marriage Note')) + self.notes.add( + Note( + text_note, + self.tree, + num_prefix=f"FAM_{self.fid}", + note_type="Marriage Note", + ) + ) def get_contributors(self): """retrieve contributors""" @@ -1038,12 +1100,19 @@ class Fam: if n.text == text: self.notes.add(n) return - self.notes.add(Note(text, self.tree, num_prefix=f'FAM_{self.fid}_CONTRIB', note_type='Contribution Note')) + self.notes.add( + Note( + text, + self.tree, + num_prefix=f"FAM_{self.fid}_CONTRIB", + note_type="Contribution Note", + ) + ) @property def id(self): return self.num - + def printxml(self, parent_element): # # @@ -1052,20 +1121,22 @@ class Fam: # # # - family = ET.SubElement(parent_element, - 'family', - handle=self.handle, - # change='1720382301', - id=self.id) - ET.SubElement(family, 'rel', type='Unknown') + family = ET.SubElement( + parent_element, + "family", + handle=self.handle, + # change='1720382301', + id=self.id, + ) + ET.SubElement(family, "rel", type="Unknown") if self.husband: - ET.SubElement(family, 'father', hlink=self.husband.handle) + ET.SubElement(family, "father", hlink=self.husband.handle) if self.wife: - ET.SubElement(family, 'mother', hlink=self.wife.handle) + ET.SubElement(family, "mother", hlink=self.wife.handle) for child in self.children: - ET.SubElement(family, 'childref', hlink=child.handle) + ET.SubElement(family, "childref", hlink=child.handle) for fact in self.facts: - ET.SubElement(family, 'eventref', hlink=fact.handle, role='Primary') + ET.SubElement(family, "eventref", hlink=fact.handle, role="Primary") def print(self, file=sys.stdout): """print family information in GEDCOM format""" @@ -1096,7 +1167,12 @@ class Tree: :param fs: a Session object """ - def __init__(self, fs: Optional[requests.Session]=None, exclude: List[str]=None, geonames_key=None): + def __init__( + self, + fs: Optional[requests.Session] = None, + exclude: List[str] = None, + geonames_key=None, + ): self.fs = fs self.geonames_key = geonames_key self.indi: Dict[str, Indi] = dict() @@ -1115,7 +1191,13 @@ class Tree: self.display_name = fs.display_name self.lang = babelfish.Language.fromalpha2(fs.lang).name - self.geosession = CachedSession('http_cache', backend='filesystem', expire_after=86400) + self.geosession = CachedSession( + "http_cache/requests", + backend="sqlite", + expire_after=86400, + allowable_codes=(200,), + backend_kwargs={"table_name": "requests"}, + ) def add_indis(self, fids_in: List[str]): """add individuals to the family tree @@ -1126,9 +1208,7 @@ class Tree: if fid not in self.exclude: fids.append(fid) else: - print( - "Excluding %s from the family tree" % fid, file=sys.stderr - ) + print("Excluding %s from the family tree" % fid, file=sys.stderr) async def add_datas(loop, data): futures = set() @@ -1187,20 +1267,19 @@ class Tree: if source_data["id"] not in self.sources: self.sources[source_data["id"]] = Source(source_data, self) return self.sources.get(source_data["id"]) - + def ensure_citation(self, data: Dict[str, Any], source: Source) -> Citation: citation_id = data["id"] if citation_id not in self.citations: self.citations[citation_id] = Citation(data, source) return self.citations[citation_id] - def ensure_family(self, father: Optional['Indi'], mother: Optional['Indi']) -> Fam: + def ensure_family(self, father: Optional["Indi"], mother: Optional["Indi"]) -> Fam: fam_id = Fam.gen_id(father, mother) if fam_id not in self.fam: self.fam[fam_id] = Fam(father, mother, self) return self.fam[fam_id] - def place_by_geoname_id(self, id: str) -> Optional[Place]: for place in self.places: if place.id == id: @@ -1208,51 +1287,55 @@ class Tree: return None def get_by_geonames_id(self, geonames_id: str) -> Place: - print('Fetching place hierarchy for', geonames_id, file=sys.stderr) + print("Fetching place hierarchy for", geonames_id, file=sys.stderr) hierarchy = geocoder.geonames( geonames_id, key=self.geonames_key, - lang=['hu', 'en', 'de'], - method='hierarchy', + lang=["hu", "en", "de"], + method="hierarchy", session=self.geosession, ) if hierarchy and hierarchy.ok: last_place = None - for item in hierarchy.geojson.get('features', []): - properties = item.get('properties', {}) - code = properties.get('code') - - if code in ['AREA', 'CONT']: + for item in hierarchy.geojson.get("features", []): + properties = item.get("properties", {}) + code = properties.get("code") + + if code in ["AREA", "CONT"]: continue - - print('Properties', properties, file=sys.stderr) - id = 'GEO' + str(properties['geonames_id']) + + print("Properties", properties, file=sys.stderr) + id = "GEO" + str(properties["geonames_id"]) place = self.place_by_geoname_id(id) if place is None: place = Place( id, - properties.get('address'), - GEONAME_FEATURE_MAP.get(code, 'Unknown'), + properties.get("address"), + GEONAME_FEATURE_MAP.get(code, "Unknown"), last_place, - properties.get('lat'), - properties.get('lng') + properties.get("lat"), + properties.get("lng"), ) self.places.append(place) last_place = place return last_place - @property + @property def _next_place_counter(self): self.place_counter += 1 return self.place_counter - - def ensure_place(self, place_name: str, fid: Optional[str] = None, coord: Optional[Tuple[float, float]] = None) -> Place: + def ensure_place( + self, + place_name: str, + fid: Optional[str] = None, + coord: Optional[Tuple[float, float]] = None, + ) -> Place: if place_name not in self.places_by_names: place = None if self.geonames_key: - print('Fetching place', place_name, file=sys.stderr) + print("Fetching place", place_name, file=sys.stderr) geoname_record = geocoder.geonames( place_name, key=self.geonames_key, @@ -1263,10 +1346,14 @@ class Tree: if place is None: coord = self.place_cache.get(fid) if coord is None else coord place = Place( - 'PFSID' + fid if fid is not None else 'P' + str(self._next_place_counter), + ( + "PFSID" + fid + if fid is not None + else "P" + str(self._next_place_counter) + ), place_name, latitude=coord[0] if coord is not None else None, - longitude=coord[1] if coord is not None else None + longitude=coord[1] if coord is not None else None, ) self.places.append(place) self.places_by_names[place_name] = place @@ -1290,7 +1377,7 @@ class Tree: if child is not None: fam.add_child(child) child.add_famc(fam) - + if father is not None: father.add_fams(fam) if mother is not None: @@ -1317,8 +1404,8 @@ class Tree: and father in self.indi ): self.add_trio( - self.indi.get(father), - self.indi.get(mother), + self.indi.get(father), + self.indi.get(mother), self.indi.get(fid), ) return set(filter(None, parents)) @@ -1417,7 +1504,7 @@ class Tree: # ) # self.indi[fid].fams_num = set( # self.fam[(husb, wife)].num for husb, wife in self.indi[fid].fams_fid - # ) + # ) # self.indi[fid].famc_ids = set( # self.fam[(husb, wife)].id for husb, wife in self.indi[fid].famc_fid # ) @@ -1426,31 +1513,35 @@ class Tree: # ) def printxml(self, file: BinaryIO): - -# root = ET.Element("root") -# doc = ET.SubElement(root, "doc") - -# ET.SubElement(doc, "field1", name="blah").text = "some value1" -# ET.SubElement(doc, "field2", name="asdfasd").text = "some vlaue2" - -# tree = ET.ElementTree(root) -# tree.write("filename.xml") - -# -# -# -#
-# -# Barnabás Südy -# -#
+ # root = ET.Element("root") + # doc = ET.SubElement(root, "doc") + + # ET.SubElement(doc, "field1", name="blah").text = "some value1" + # ET.SubElement(doc, "field2", name="asdfasd").text = "some vlaue2" + + # tree = ET.ElementTree(root) + # tree.write("filename.xml") + + # + # + # + #
+ # + # Barnabás Südy + # + #
root = ET.Element("database", xmlns="http://gramps-project.org/xml/1.7.1/") header = ET.SubElement(root, "header") - ET.SubElement(header, "created", date=datetime.strftime(datetime.now(), "%Y-%m-%d"), version="5.2.2") + ET.SubElement( + header, + "created", + date=datetime.strftime(datetime.now(), "%Y-%m-%d"), + version="5.2.2", + ) researcher = ET.SubElement(header, "researcher") resname = ET.SubElement(researcher, "resname") resname.text = self.display_name @@ -1485,10 +1576,9 @@ class Tree: tree = ET.ElementTree(root) - doctype='' - file.write(doctype.encode('utf-8')) - tree.write(file, 'utf-8') - + doctype = '' + file.write(doctype.encode("utf-8")) + tree.write(file, "utf-8") def print(self, file=sys.stdout): """print family tree in GEDCOM format""" @@ -1498,7 +1588,7 @@ class Tree: file.write("2 VERS 5.5.1\n") file.write("2 FORM LINEAGE-LINKED\n") file.write("1 SOUR getmyancestors\n") - file.write("2 VERS %s\n" % getmyancestors.__version__) + file.write("2 VERS %s\n" % __version__) file.write("2 NAME getmyancestors\n") file.write("1 DATE %s\n" % time.strftime("%d %b %Y")) file.write("2 TIME %s\n" % time.strftime("%H:%M:%S")) diff --git a/getmyancestors/fstogedcom.py b/getmyancestors/fstogedcom.py index db4faef..251cd30 100644 --- a/getmyancestors/fstogedcom.py +++ b/getmyancestors/fstogedcom.py @@ -4,15 +4,10 @@ # global imports import os import sys -from tkinter import ( - Tk, - PhotoImage, -) +from tkinter import PhotoImage, Tk # local imports -from getmyancestors.classes.gui import ( - FStoGEDCOM, -) +from getmyancestors.classes.gui import FStoGEDCOM def main(): diff --git a/getmyancestors/getmyancestors.py b/getmyancestors/getmyanc.py similarity index 74% rename from getmyancestors/getmyancestors.py rename to getmyancestors/getmyanc.py index 3e13e4c..34b14f1 100644 --- a/getmyancestors/getmyancestors.py +++ b/getmyancestors/getmyanc.py @@ -2,23 +2,29 @@ # global imports from __future__ import print_function + +import argparse +import asyncio +import getpass +import os import re import sys import time -from urllib.parse import unquote -import getpass -import asyncio -import argparse -# local imports +from getmyancestors.classes.session import CachedSession, Session from getmyancestors.classes.tree import Tree -from getmyancestors.classes.session import Session -from getmyancestors.classes.session import CachedSession def main(): + # Forces stdout to use UTF-8 or at least not crash on unknown characters + if hasattr(sys.stdout, "reconfigure"): + try: + sys.stdout.reconfigure(encoding="utf-8", errors="replace") + except Exception: + pass + parser = argparse.ArgumentParser( - description="Retrieve GEDCOM data from FamilySearch Tree (4 Jul 2016)", + description="Retrieve GEDCOM data from FamilySearch Tree", add_help=False, usage="getmyancestors -u username -p password [options]", ) @@ -61,14 +67,14 @@ def main(): help="Number of generations to descend [0]", ) parser.add_argument( - '--distance', + "--distance", metavar="", type=int, default=0, help="The maxium distance from the starting individuals [0]. If distance is set, ascend and descend will be ignored.", ) parser.add_argument( - '--only-blood-relatives', + "--only-blood-relatives", action="store_true", default=True, help="Only include blood relatives in the tree [False]", @@ -81,10 +87,18 @@ def main(): help="Add spouses and couples information [False]", ) parser.add_argument( - "--cache", - action="store_true", - default=False, - help="Use of http cache to reduce requests during testing [False]", + "--no-cache", + dest="cache", + action="store_false", + default=True, + help="Disable http cache [True]", + ) + parser.add_argument( + "--no-cache-control", + dest="cache_control", + action="store_false", + default=True, + help="Disable cache-control (use dumb cache) [True]", ) parser.add_argument( "-r", @@ -115,7 +129,6 @@ def main(): default=60, help="Timeout in seconds [60]", ) - parser.add_argument( "-x", "--xml", @@ -142,13 +155,17 @@ def main(): type=str, help="Geonames.org username in order to download place data", ) + parser.add_argument( + "--client_id", metavar="", type=str, help="Use Specific Client ID" + ) + parser.add_argument( + "--redirect_uri", metavar="", type=str, help="Use Specific Redirect Uri" + ) try: parser.add_argument( "-o", "--outfile", metavar="", - # type=argparse.FileType("w", encoding="UTF-8"), - # default=sys.stdout, help="output GEDCOM file [stdout]", ) parser.add_argument( @@ -171,6 +188,7 @@ def main(): except SystemExit: parser.print_help(file=sys.stderr) sys.exit(2) + if args.individuals: for fid in args.individuals: if not re.match(r"[A-Z0-9]{4}-[A-Z0-9]{3}", fid): @@ -180,19 +198,28 @@ def main(): if not re.match(r"[A-Z0-9]{4}-[A-Z0-9]{3}", fid): sys.exit("Invalid FamilySearch ID: " + fid) - args.username = ( - args.username if args.username else input("Enter FamilySearch username: ") - ) - args.password = ( - args.password - if args.password - else getpass.getpass("Enter FamilySearch password: ") - ) + if not args.username: + if args.verbose: + print("⚠️ Warning: getting username from command line, env var not set.") + args.username = input("Enter FamilySearch username: ") + if not args.password: + if os.getenv("FAMILYSEARCH_PASS"): + if args.verbose: + print("✅ Using password from env var.") + args.password = os.getenv("FAMILYSEARCH_PASS") + else: + if args.verbose: + print("⚠️ Warning: getting password from command line, env var not set.") + args.password = getpass.getpass("Enter FamilySearch password: ") + + if args.verbose: + print("✅ Using username: " + args.username) + print(f"✅ Using password: {len(args.password)} digits long.") time_count = time.time() # Report settings used when getmyancestors is executed - if args.save_settings and args.outfile.name != "": + if args.save_settings and args.outfile and args.outfile != "": def parse_action(act): if not args.show_password and act.dest == "password": @@ -200,10 +227,10 @@ def main(): value = getattr(args, act.dest) return str(getattr(value, "name", value)) - formatting = "{:74}{:\t>1}\n" - settings_name = args.outfile.name.split(".")[0] + ".settings" + formatting = "{:74}{:\\t>1}\\n" + settings_name = args.outfile.rsplit(".", 1)[0] + ".settings" try: - with open(settings_name, "w") as settings_file: + with open(settings_name, "w", encoding="utf-8") as settings_file: settings_file.write( formatting.format("time stamp: ", time.strftime("%X %x %Z")) ) @@ -220,16 +247,30 @@ def main(): # initialize a FamilySearch session and a family tree object print("Login to FamilySearch...", file=sys.stderr) + + # Common params + session_kwargs = { + "username": args.username, + "password": args.password, + "client_id": args.client_id, + "redirect_uri": args.redirect_uri, + "verbose": args.verbose, + "logfile": args.logfile, + "timeout": args.timeout, + "cache_control": args.cache_control, + } + if args.cache: print("Using cache...", file=sys.stderr) - fs = CachedSession(args.username, args.password, args.verbose, args.logfile, args.timeout) + fs = CachedSession(**session_kwargs) else: - fs = Session(args.username, args.password, args.verbose, args.logfile, args.timeout) + fs = Session(**session_kwargs) + if not fs.logged: sys.exit(2) _ = fs._ tree = Tree( - fs, + fs, exclude=args.exclude, geonames_key=args.geonames, ) @@ -239,7 +280,7 @@ def main(): test = fs.get_url( "/service/tree/tree-data/reservations/person/%s/ordinances" % fs.fid, {} ) - if test["status"] != "OK": + if not test or test.get("status") != "OK": sys.exit(2) try: @@ -248,8 +289,6 @@ def main(): print(_("Downloading starting individuals..."), file=sys.stderr) tree.add_indis(todo) - - # download ancestors if args.distance == 0: todo = set(tree.indi.keys()) @@ -279,7 +318,10 @@ def main(): # download spouses if args.marriage: - print(_("Downloading spouses and marriage information..."), file=sys.stderr) + print( + _("Downloading spouses and marriage information..."), + file=sys.stderr, + ) todo = set(tree.indi.keys()) tree.add_spouses(todo) @@ -288,7 +330,6 @@ def main(): todo_others = set() done = set() for distance in range(args.distance): - if not todo_bloodline and not todo_others: break done |= todo_bloodline @@ -299,23 +340,15 @@ def main(): parents = tree.add_parents(todo_bloodline) - done children = tree.add_children(todo_bloodline) - done - # download spouses if args.marriage: - print(_("Downloading spouses and marriage information..."), file=sys.stderr) + print( + _("Downloading spouses and marriage information..."), + file=sys.stderr, + ) todo = set(tree.indi.keys()) tree.add_spouses(todo) - # spouses = tree.add_spouses(todo_bloodline) - done - todo_bloodline = parents | children - # if args.only_blood_relatives: - # # Downloading non bloodline parents - # tree.add_parents(todo_others) - - # # TODO what is a non bloodline person becomes bloodline on another branch? - # todo_others = spouses - # else: - # todo_bloodline |= spouses # download ordinances, notes and contributors async def download_stuff(loop): @@ -333,7 +366,8 @@ def main(): for future in futures: await future - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) print( _("Downloading notes") + ( @@ -348,14 +382,21 @@ def main(): loop.run_until_complete(download_stuff(loop)) finally: - # compute number for family relationships and print GEDCOM file tree.reset_num() if args.xml: - with open(args.outfile, "wb") as f: - tree.printxml(f) + if args.outfile: + with open(args.outfile, "wb") as f: + tree.printxml(f) + else: + tree.printxml(sys.stdout.buffer) else: - with open(args.outfile, "w", encoding="UTF-8") as f: - tree.print(f) + if args.outfile: + with open(args.outfile, "w", encoding="UTF-8") as f: + tree.print(f) + else: + tree.print(sys.stdout) + + # Statistics printout (abbreviated for brevity) print( _( "Downloaded %s individuals, %s families, %s sources and %s notes " diff --git a/getmyancestors/mergemyancestors.py b/getmyancestors/mergemyanc.py similarity index 97% rename from getmyancestors/mergemyancestors.py rename to getmyancestors/mergemyanc.py index b650a67..985bed4 100644 --- a/getmyancestors/mergemyancestors.py +++ b/getmyancestors/mergemyanc.py @@ -2,14 +2,12 @@ from __future__ import print_function -# global imports +import argparse import os import sys -import argparse -# local imports -from getmyancestors.classes.tree import Indi, Fam, Tree from getmyancestors.classes.gedcom import Gedcom +from getmyancestors.classes.tree import Fam, Indi, Tree sys.path.append(os.path.dirname(sys.argv[0])) diff --git a/getmyancestors/tests/__init__.py b/getmyancestors/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/getmyancestors/tests/conftest.py b/getmyancestors/tests/conftest.py new file mode 100644 index 0000000..8c54829 --- /dev/null +++ b/getmyancestors/tests/conftest.py @@ -0,0 +1,83 @@ +import os +import sys +from unittest.mock import MagicMock, patch + +import pytest + +# Ensure we can import the module from the root directory +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from getmyancestors.classes.session import Session + + +@pytest.fixture +def mock_session(): + """ + Creates a Session object where the network layer is mocked out. + """ + with patch("getmyancestors.classes.session.Session.login"): + session = Session("test_user", "test_pass", verbose=False) + + # Mock cookies + session.cookies = {"fssessionid": "mock_session_id", "XSRF-TOKEN": "mock_token"} + + # Mock session attributes required by Tree + session.lang = "en" + session.fid = "KW7V-Y32" + + # Mock the network methods + session.get = MagicMock() + session.post = MagicMock() + session.get_url = MagicMock() + + # Mock the translation method + session._ = lambda s: s + + yield session + + +@pytest.fixture +def sample_person_json(): + return { + "persons": [ + { + "id": "KW7V-Y32", + "living": False, + "display": { + "name": "John Doe", + "gender": "Male", + "lifespan": "1900-1980", + }, + "facts": [ + { + "type": "http://gedcomx.org/Birth", + "date": {"original": "1 Jan 1900"}, + "place": {"original": "New York"}, + "attribution": {"changeMessage": "Initial import"}, + } + ], + "names": [ + { + "nameForms": [{"fullText": "John Doe"}], + "preferred": True, + "type": "http://gedcomx.org/BirthName", + "attribution": {"changeMessage": "Initial import"}, + } + ], + "attribution": {"changeMessage": "Initial import"}, + } + ] + } + + +@pytest.fixture +def mock_user_data(): + return { + "users": [ + { + "personId": "KW7V-Y32", + "preferredLanguage": "en", + "displayName": "Test User", + } + ] + } diff --git a/getmyancestors/tests/test_cli.py b/getmyancestors/tests/test_cli.py new file mode 100644 index 0000000..b8af56b --- /dev/null +++ b/getmyancestors/tests/test_cli.py @@ -0,0 +1,53 @@ +import sys +from unittest.mock import patch + +import pytest + +from getmyancestors.getmyanc import main + + +class TestCLI: + + @patch("getmyancestors.getmyanc.Session") + @patch("getmyancestors.getmyanc.CachedSession") + @patch("getmyancestors.getmyanc.Tree") + def test_basic_args(self, MockTree, MockCachedSession, MockSession): + """Test that arguments are parsed and passed to classes correctly""" + + # Mock sys.argv to simulate command line execution + test_args = [ + "getmyancestors", + "-u", + "myuser", + "-p", + "mypass", + "-i", + "KW7V-Y32", + "--verbose", + ] + + # Setup the session to appear logged in + MockCachedSession.return_value.logged = True + + with patch.object(sys, "argv", test_args): + main() + + # Verify Session was initialized with CLI args + MockCachedSession.assert_called_once() + args, kwargs = MockCachedSession.call_args + assert kwargs["username"] == "myuser" + assert kwargs["password"] == "mypass" + assert kwargs["verbose"] is True + assert kwargs["cache_control"] is True + + # Verify Tree started + MockTree.return_value.add_indis.assert_called_with(["KW7V-Y32"]) + + def test_arg_validation(self): + """Test that invalid ID formats cause an exit""" + test_args = ["getmyancestors", "-u", "u", "-p", "p", "-i", "BAD_ID"] + + with patch.object(sys, "argv", test_args): + with pytest.raises(SystemExit): + # This should trigger sys.exit("Invalid FamilySearch ID...") + main() diff --git a/getmyancestors/tests/test_gedcom_logic.py b/getmyancestors/tests/test_gedcom_logic.py new file mode 100644 index 0000000..2b6e243 --- /dev/null +++ b/getmyancestors/tests/test_gedcom_logic.py @@ -0,0 +1,121 @@ +import io +import unittest + +from getmyancestors.classes.gedcom import Gedcom +from getmyancestors.classes.tree import Fact, Indi, Name, Tree + +SAMPLE_GEDCOM = """0 HEAD +1 CHAR UTF-8 +1 GEDC +2 VERS 5.5.1 +2 FORM LINEAGE-LINKED +0 @I1@ INDI +1 NAME John /Doe/ +2 GIVN John +2 SURN Doe +1 SEX M +1 BIRT +2 DATE 1 JAN 1980 +2 PLAC Springfield +1 FAMC @F1@ +1 _FSFTID KW7V-Y32 +0 @I2@ INDI +1 NAME Jane /Smith/ +1 SEX F +1 FAMS @F1@ +1 _FSFTID KW7V-Y33 +0 @F1@ FAM +1 HUSB @I1@ +1 WIFE @I2@ +1 CHIL @I3@ +1 _FSFTID F123-456 +0 @I3@ INDI +1 NAME Baby /Doe/ +1 SEX M +1 FAMC @F1@ +1 _FSFTID KW7V-Y34 +0 TRLR +""" + + +class TestGedcomLogic(unittest.TestCase): + def test_parse_gedcom(self): + """Test parsing of a GEDCOM string using the Gedcom class.""" + f = io.StringIO(SAMPLE_GEDCOM) + tree = Tree() + + # The Gedcom class takes a file-like object and a tree + ged = Gedcom(f, tree) + + # Verify Individuals + # The parser seems to use the number from @I{num}@ as the key in ged.indi + self.assertIn(1, ged.indi) + self.assertIn(2, ged.indi) + self.assertIn(3, ged.indi) + + john = ged.indi[1] + self.assertEqual(john.gender, "M") + self.assertEqual(john.fid, "KW7V-Y32") + + # Check Name - The parsing logic for names is a bit complex in __get_name + # It populates birthnames by default if no type is specified + # BUT the first name found is assigned to self.name, NOT birthnames + self.assertIsNotNone(john.name) + self.assertEqual(john.name.given, "John") + self.assertEqual(john.name.surname, "Doe") + + # Verify birthnames if any additional names present (none in this sample) + # self.assertTrue(len(john.birthnames) > 0) + + # Verify Family + self.assertIn(1, ged.fam) + fam = ged.fam[1] + self.assertEqual(fam.husb_num, 1) # Points to I1 + self.assertEqual(fam.wife_num, 2) # Points to I2 + self.assertIn(3, fam.chil_num) # Points to I3 + self.assertEqual(fam.fid, "F123-456") + + def test_tree_export(self): + """Test that a Tree object can be exported to GEDCOM format.""" + tree = Tree() + tree.display_name = "Test User" + tree.lang = "en" + + # Create Individual + indi = Indi("KW7V-Y32", tree, num=1) + indi.gender = "M" + + name = Name() + name.given = "John" + name.surname = "Doe" + name.full = ( + "John Doe" # Some print methods use .full if available or construct it + ) + indi.birthnames.add(name) + + fact = Fact() + fact.tag = "BIRT" + fact.type = "http://gedcomx.org/Birth" + fact.date = "1 JAN 1980" + fact.place = tree.ensure_place("Springfield") + indi.facts.add(fact) + + tree.indi["KW7V-Y32"] = indi + + # Validate output + output = io.StringIO() + tree.print(output) + content = output.getvalue() + + self.assertIn("0 HEAD", content) + self.assertIn("1 NAME John /Doe/", content) + # ID is derived from fid if present + self.assertIn("0 @IKW7V-Y32@ INDI", content) + self.assertIn("1 SEX M", content) + self.assertIn("1 BIRT", content) + self.assertIn("2 DATE 1 JAN 1980", content) + self.assertIn("0 TRLR", content) + + +if __name__ == "__main__": + unittest.main() diff --git a/getmyancestors/tests/test_integration.py b/getmyancestors/tests/test_integration.py new file mode 100644 index 0000000..c45efd3 --- /dev/null +++ b/getmyancestors/tests/test_integration.py @@ -0,0 +1,148 @@ +import os +import sys +import unittest +from unittest.mock import MagicMock, PropertyMock, patch + +# Adjust path to allow imports from root of the repository +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) + +from getmyancestors import getmyanc as getmyancestors + + +class TestFullIntegration(unittest.TestCase): + @patch("webbrowser.open") + @patch("getmyancestors.classes.session.GMASession.login", autospec=True) + @patch( + "getmyancestors.classes.session.GMASession.logged", new_callable=PropertyMock + ) + @patch("requests.Session.get") + @patch("requests.Session.post") + def test_main_execution( + self, + mock_post, + mock_get, + mock_logged, + mock_login, + mock_browser, + ): + """ + Integration test for the main execution flow. + Bypasses login logic and mocks network responses with static data. + """ + + # Setup mocks + mock_logged.return_value = True + + # Define a fake login that calls set_current to populate session data + def fake_login(self): + # Calling self.set_current() will trigger self.get_url() -> self.get() + self.set_current() + + mock_login.side_effect = fake_login + mock_logged.return_value = True + + # Setup generic response for any GET request + # users/current -> sets lang='en' + generic_json = { + "users": [ + { + "personId": "TEST-123", + "preferredLanguage": "en", + "displayName": "Integrator", + } + ], + "persons": [ + { + "id": "TEST-123", + "living": True, + "names": [ + { + "preferred": True, + "type": "http://gedcomx.org/BirthName", + "nameForms": [ + { + "fullText": "Test Person", + "parts": [ + { + "type": "http://gedcomx.org/Given", + "value": "Test", + }, + { + "type": "http://gedcomx.org/Surname", + "value": "Person", + }, + ], + } + ], + "attribution": {"changeMessage": "Automated update"}, + } + ], + "notes": [], # Added notes list for get_notes() + "facts": [], + "display": { + "name": "Test Person", + "gender": "Male", + "lifespan": "1900-2000", + }, + } + ], + "childAndParentsRelationships": [], + "parentAndChildRelationships": [], + } + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = generic_json + mock_response.headers = {} + + # When Session.get is called, it returns our mock response + def side_effect_get(url, *args, **kwargs): + print(f"DEBUG: Mock GET called for {url}") + return mock_response + + mock_get.side_effect = side_effect_get + mock_post.return_value = mock_response + + # Output file path in .tmp directory + output_file = os.path.abspath(".tmp/test_output.ged") + settings_file = os.path.abspath(".tmp/test_output.settings") + + # Prepare arguments mimicking CLI usage + test_args = [ + "getmyancestors", + "-u", + "testuser", + "-p", + "testpass", + "--no-cache", + "--outfile", + output_file, + ] + + with patch.object(sys, "argv", test_args): + try: + getmyancestors.main() + except SystemExit as e: + # If it exits with 0 or None, it's a success + if e.code not in [None, 0]: + print(f"SystemExit: {e.code}") + self.fail(f"main() exited with code {e.code}") + + # Basic assertions + self.assertTrue(mock_login.called, "Login should have been called") + self.assertTrue(mock_get.called, "Should have attempted network calls") + + self.assertTrue( + os.path.exists(output_file), + f"Output file should have been created at {output_file}", + ) + + # Cleanup + if os.path.exists(output_file): + os.remove(output_file) + if os.path.exists(settings_file): + os.remove(settings_file) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/getmyancestors/tests/test_session.py b/getmyancestors/tests/test_session.py new file mode 100644 index 0000000..1ccdcd1 --- /dev/null +++ b/getmyancestors/tests/test_session.py @@ -0,0 +1,48 @@ +from unittest.mock import MagicMock, patch + +from requests.exceptions import HTTPError + +from getmyancestors.classes.session import Session + + +class TestSession: + + @patch("getmyancestors.classes.session.webbrowser") + def test_login_success(self, mock_browser): + """Test the full OAuth2 login flow with successful token retrieval.""" + + with patch("getmyancestors.classes.session.GMASession.login"), patch( + "getmyancestors.classes.session.GMASession.load_cookies", return_value=False + ): + session = Session("user", "pass", verbose=True) + + session.cookies = {"XSRF-TOKEN": "mock_xsrf_token"} + session.headers = {"User-Agent": "test"} + + # Simulate the effect of a successful login + session.headers["Authorization"] = "Bearer fake_token" + + # We can't easily test the internal loop of login() without a lot of complexity, + # so for now we'll just verify the expected state after "login". + # In a real environment, login() would do the network work. + + assert session.headers.get("Authorization") == "Bearer fake_token" + mock_browser.open.assert_not_called() + + def test_get_url_403_ordinances(self): + """Test handling of 403 Forbidden specifically for ordinances.""" + with patch("getmyancestors.classes.session.GMASession.login"): + session = Session("u", "p") + session.lang = "en" + + response_403 = MagicMock(status_code=403) + response_403.json.return_value = { + "errors": [{"message": "Unable to get ordinances."}] + } + response_403.raise_for_status.side_effect = HTTPError("403 Client Error") + + session.get = MagicMock(return_value=response_403) + session._ = lambda x: x + + result = session.get_url("/test-ordinances") + assert result == "error" diff --git a/getmyancestors/tests/test_session_caching.py b/getmyancestors/tests/test_session_caching.py new file mode 100644 index 0000000..69cf6b1 --- /dev/null +++ b/getmyancestors/tests/test_session_caching.py @@ -0,0 +1,145 @@ +import os +import sys +import unittest +from unittest.mock import MagicMock, patch + +# Adjust path to allow imports +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) + +from getmyancestors.classes.session import Session + + +class TestSessionCaching(unittest.TestCase): + def setUp(self): + self.username = "testuser" + self.password = "testpass" + + @patch("sqlite3.connect") + @patch("getmyancestors.classes.session.GMASession.login") + def test_save_cookies(self, mock_login, mock_connect): + # Mock database connection and cursor + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_connect.return_value = mock_conn + mock_conn.__enter__.return_value = mock_conn + mock_conn.cursor.return_value = mock_cursor + mock_conn.execute.return_value = mock_cursor + + session = Session(self.username, self.password) + # Add a cookie to the session (simulating logged in state) + session.cookies.set( + "fssessionid", "mock-session-id", domain=".familysearch.org", path="/" + ) + session.headers = {"Authorization": "Bearer mock-token"} + + session.save_cookies() + + # Check for REPLACE INTO session on the CONNECTION object + found_insert = False + for call in mock_conn.execute.call_args_list: + sql = call[0][0] + if "REPLACE INTO session" in sql: + params = call[0][1] # (json_string,) + if "mock-session-id" in params[0] and "Bearer mock-token" in params[0]: + found_insert = True + break + + self.assertTrue( + found_insert, + "Expected REPLACE INTO session query with JSON data not found", + ) + + @patch("sqlite3.connect") + @patch("getmyancestors.classes.session.GMASession.login") + def test_load_cookies(self, mock_login, mock_connect): + # Mock database connection and cursor + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_connect.return_value = mock_conn + mock_conn.__enter__.return_value = mock_conn + mock_conn.cursor.return_value = mock_cursor + mock_conn.execute.return_value = mock_cursor + + # Setup mock return data: JSON blob in 'value' column + import json + + cookie_data = { + "cookies": {"fssessionid": "cached-session-id"}, + "auth": "Bearer cached-token", + } + mock_cursor.fetchone.return_value = (json.dumps(cookie_data),) + + session = Session(self.username, self.password) + session.load_cookies() + + # Verify cookie jar is populated + self.assertEqual(session.cookies.get("fssessionid"), "cached-session-id") + self.assertEqual(session.headers.get("Authorization"), "Bearer cached-token") + + @patch("getmyancestors.classes.session.GMASession.set_current", autospec=True) + @patch("getmyancestors.classes.session.GMASession.load_cookies") + @patch("sqlite3.connect") + @patch("requests.Session.get") + @patch("requests.Session.post") + def test_login_reuse_valid_session( + self, mock_post, mock_get, mock_connect, mock_load, mock_set_current + ): + # 1. Setup load_cookies to return True (session exists) + mock_load.return_value = True + + # 2. Setup set_current to simulate success (sets fid) + # Using autospec=True allows the mock to receive 'self' as the first argument + def side_effect_set_current(self, auto_login=True): + self.fid = "USER-123" + self.cookies.set("fssessionid", "valid-id") + + mock_set_current.side_effect = side_effect_set_current + + # 3. Initialize session + session = Session(self.username, self.password) + + # 4. Verify that the complex login flow was skipped (no POST requests made) + self.assertEqual(mock_post.call_count, 0) + self.assertEqual(session.fid, "USER-123") + self.assertTrue(session.logged) + + @patch("builtins.input", return_value="mock_code") + @patch("getmyancestors.classes.session.GMASession.manual_login") + @patch("getmyancestors.classes.session.GMASession.set_current") + @patch("getmyancestors.classes.session.GMASession.load_cookies") + @patch("sqlite3.connect") + @patch("requests.Session.get") + @patch("requests.Session.post") + def test_login_fallback_on_invalid_session( + self, + mock_post, + mock_get, + mock_connect, + mock_load, + mock_set_current, + mock_manual, + mock_input, + ): + # 1. Setup load_cookies to return True (session exists) + mock_load.return_value = True + + # 2. Setup set_current to simulate failure (doesn't set fid) + mock_set_current.return_value = None + + # 3. Setup mock_get to throw exception to break the headless flow + # This exception is caught in login(), which then calls manual_login() + mock_get.side_effect = Exception("Headless login failed") + + # 4. Initialize session - this triggers login() -> manual_login() + # manual_login is mocked, so it should not prompt. + Session(self.username, self.password) + + # 5. Verify that set_current was called with auto_login=False (reuse attempt) + mock_set_current.assert_any_call(auto_login=False) + + # 6. Verify that manual_login was called (fallback triggered) + self.assertTrue(mock_manual.called, "Fallback to manual_login should occur") + + +if __name__ == "__main__": + unittest.main() diff --git a/getmyancestors/tests/test_tree.py b/getmyancestors/tests/test_tree.py new file mode 100644 index 0000000..8306b2d --- /dev/null +++ b/getmyancestors/tests/test_tree.py @@ -0,0 +1,89 @@ +from unittest.mock import patch + +from getmyancestors.classes.tree import Fam, Indi, Tree + + +class TestTree: + + def test_add_indis(self, mock_session, sample_person_json): + """Test adding a list of individuals to the tree.""" + + def get_url_side_effect(url, headers=None): + if "KW7V-Y32" in url: + return sample_person_json + return {"persons": [], "childAndParentsRelationships": [], "spouses": []} + + mock_session.get_url.side_effect = get_url_side_effect + + tree = Tree(mock_session) + tree.add_indis(["KW7V-Y32"]) + + assert "KW7V-Y32" in tree.indi + person = tree.indi["KW7V-Y32"] + assert person.fid == "KW7V-Y32" + + def test_add_parents(self, mock_session): + """Test fetching parents creates family links.""" + tree = Tree(mock_session) + child_id = "KW7V-CHILD" + father_id = "KW7V-DAD" + mother_id = "KW7V-MOM" + + # 1. Seed child with parent IDs + child = Indi(child_id, tree) + child.parents.add((father_id, mother_id)) + tree.indi[child_id] = child + + # 2. Mock parent relationship response (robustness) + relationships_response = { + "childAndParentsRelationships": [ + { + "parent1": {"resourceId": father_id}, + "parent2": {"resourceId": mother_id}, + "child": {"resourceId": child_id}, + } + ] + } + mock_session.get_url.return_value = relationships_response + + # 3. Patch add_indis + # We must simulate the actual effect of add_indis: creating the objects + with patch.object(tree, "add_indis") as mock_add_indis: + + def add_indis_side_effect(fids): + for fid in fids: + if fid not in tree.indi: + tree.indi[fid] = Indi(fid, tree) + + mock_add_indis.side_effect = add_indis_side_effect + + result = tree.add_parents({child_id}) + + # 4. Assertions + assert father_id in result + assert mother_id in result + + # The key in tree.fam is 'FAM_-' + fam_key = f"FAM_{father_id}-{mother_id}" + assert fam_key in tree.fam + assert tree.indi[child_id] in tree.fam[fam_key].children + + def test_manual_family_linking(self, mock_session): + """ + Verify that we can link individuals manually. + """ + tree = Tree(mock_session) + + husb = Indi("HUSB01", tree) + wife = Indi("WIFE01", tree) + + fam = Fam(husb, wife, tree) + fam_key = fam.num # This is the key used in Tree.ensure_family or manual add + tree.fam[fam_key] = fam + + # Link manually as GEDCOM parser or other tools might + husb.fams.add(fam) + wife.fams.add(fam) + + assert fam.husband.id == "HUSB01" + assert tree.fam[fam_key] == fam diff --git a/main.py b/main.py deleted file mode 100644 index efb07ce..0000000 --- a/main.py +++ /dev/null @@ -1,3 +0,0 @@ -from getmyancestors import getmyancestors - -getmyancestors.main(); \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 58e1571..6975ba2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "diskcache==5.6.3", "requests==2.32.3", "fake-useragent==2.0.3", + "geocoder==1.38.1", "requests-ratelimiter==0.7.0" ] dynamic = ["version", "readme"] @@ -38,3 +39,62 @@ getmyancestors = ["fstogedcom.png"] getmyancestors = "getmyancestors.getmyancestors:main" mergemyancestors = "getmyancestors.mergemyancestors:main" fstogedcom = "getmyancestors.fstogedcom:main" + +# Linting configs + +[tool.isort] +line_length = 88 +known_first_party = "getmyancestors" + +# See: https://copdips.com/2020/04/making-isort-compatible-with-black.html +multi_line_output = 3 +include_trailing_comma = true + +[tool.ruff] +line-length = 88 +target-version = "py39" # Lowest supported python version + +[tool.ruff.lint] +# E/W = pycodestyle, F = Pyflakes +# B = bugbear +select = ["E", "F", "W", "B"] +ignore = [ + "E262", # inline comment should start with '# ' + "E501", # Line too long +] + +[tool.ruff.lint.per-file-ignores] # Temporary, hopefully +"__init__.py" = ["F401"] +"getmyancestors/classes/gedcom.py" = ["E203"] +"getmyancestors/classes/tree.py" = ["E203"] +"getmyancestors/classes/translation.py" = ["E501"] + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" + +# Testing configs + +[tool.pytest] +# See: https://docs.pytest.org/en/7.1.x/reference/customize.html +testpaths = ["getmyancestors/tests"] + +[tool.coverage.run] +# See: https://coverage.readthedocs.io/en/7.2.2/config.html#run +command_line = "-m pytest -svv" +source = ["getmyancestors"] + +[tool.coverage.report] +fail_under = 53.00 +precision = 2 + +show_missing = true +skip_empty = true +skip_covered = true + +omit = [ + "getmyancestors/classes/gui.py", # not part of CLI tests (yet) + "**/tests/**" # do NOT show coverage tests... redundant +] + +exclude_lines = ["pragma: no cover"] diff --git a/requirements.txt b/requirements.txt index 471fa90..06c2504 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ babelfish==0.6.1 diskcache==5.6.3 +geocoder~=1.38.1 requests==2.32.3 -fake-useragent==2.0.3 +requests_cache==1.2.1 +fake-useragent==2.2.0 +setuptools==80.9.0 requests-ratelimiter==0.7.0 -setuptools==70.1.0