--- /dev/null
+source .venv/bin/activate
+unset PS1
+source .env
+export PYTHONPATH=.
+
--- /dev/null
+!.gemini/
+!test_debug.py
+
--- /dev/null
+---
+name: ci
+
+"on":
+ push: {}
+
+permissions:
+ contents: read
+
+jobs:
+ test:
+ strategy:
+ matrix:
+ os: [ubuntu-latest, macos-latest, windows-latest]
+ runs-on: ${{ matrix.os }}
+
+ env:
+ SKIP_VENV: 1
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+ with:
+ submodules: recursive
+
+ - name: Fetch master (for incremental diff, lint filter mask)
+ run: git fetch origin master
+
+ - name: Reload Cache / pip
+ uses: actions/setup-python@v5
+ with:
+ python-version: 3
+ cache: "pip" # caching pip dependencies
+ cache-dependency-path: "**/*requirements*.txt"
+ # update-environment: false
+
+ - name: Install requirements
+ run: |
+ pip install -r requirements.txt
+ pip install -r .requirements-lint.txt
+
+ # NOTE: pytest is needed to lint the folder: "tests/"
+ # pip install -r requirements-test.txt
+
+ - name: format
+ run: make format
+
+ - name: Lint
+ run: make lint
+
+ - name: Verify no new formatting changes applied
+ run: |
+ git update-index -q --refresh
+ git diff # show the diff
+ git diff-index --quiet HEAD -- # exit non-zero on any diff
+
+ - name: Test [Unit]
+ env:
+ FAMILYSEARCH_USER: ${{ secrets.FAMILYSEARCH_USER }}
+ FAMILYSEARCH_PASS: ${{ secrets.FAMILYSEARCH_PASS }}
+ run: make test/unit
+
+ - name: Test [E2E]
+ env:
+ FAMILYSEARCH_USER: ${{ secrets.FAMILYSEARCH_USER }}
+ FAMILYSEARCH_PASS: ${{ secrets.FAMILYSEARCH_PASS }}
+ run: make test/e2e
# Redis
dump.rdb
-# Dotfiles
-.*
-!.gitignore
-!.readthedocs.yml
-
# vscode
.vscode/
# getmyancestors stuff
*.log
+*.txt
*.settings
-*.ged
\ No newline at end of file
+*.ged
+*.db
+*.sqlite
+*.sqlite3
+
+!.geminiignore
+/test_debug.py
+
--- /dev/null
+[MASTER]
+
+fail-under=9.5
+
+
+[MESSAGES CONTROL]
+
+disable=
+ fixme,
+ consider-using-f-string,
+ missing-module-docstring,
+ missing-function-docstring,
+ duplicate-code,
+ too-few-public-methods,
+ too-many-arguments,
+ too-many-positional-arguments,
+ too-many-instance-attributes,
+ too-many-branches,
+ too-many-statements,
--- /dev/null
+black==25.12.0
+coverage==7.13.1
+flake8==7.3.0
+isort==7.0.0
+mypy==1.19.1
+pylint==4.0.4
+pytest==9.0.2
+ruff==0.14.10
+types-requests==2.32.4.20250913
+
--- /dev/null
+**
+!.gitignore
--- /dev/null
+# .ONESHELL:
+SHELL:=/bin/bash
+.DEFAULT_GOAL=_help
+
+.PHONY: _help
+_help:
+ @printf "\nUsage: make <command>, valid commands:\n\n"
+ @grep -h "##H@@" $(MAKEFILE_LIST) | grep -v IGNORE_ME | sed -e 's/##H@@//' | column -t -s $$'\t'
+
+# help: ## Show this help
+# @grep -Eh '\s##\s' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
+
+
+-include .env
+
+.PHONY: test/e2e
+test/e2e: ##H@@ E2E/Smoke test for Bertrand Russell (LZDB-KV4)
+ which python
+ coverage run -p -m getmyancestors --verbose \
+ -u "${FAMILYSEARCH_USER}" `# password goes in .env file` \
+ --no-cache-control \
+ -i LZDB-KV4 -a 0 \
+ --outfile .tmp/russell_smoke_test.ged
+ echo "✓ Script completed successfully"
+ echo "File size: $(wc -c < .tmp/russell_smoke_test.ged) bytes"
+ echo "Line count: $(wc -l < .tmp/russell_smoke_test.ged) lines"
+ echo "--- First 20 lines of output ---"
+ head -n 20 .tmp/russell_smoke_test.ged
+ echo "--- Last 5 lines of output ---"
+ tail -n 5 .tmp/russell_smoke_test.ged
+
+
+.PHONY: test/unit
+test/unit: ##H@@ Run unit tests
+ coverage run -p -m pytest getmyancestors/tests
+
+.PHONY: test/
+test/: ##H@@ Run unit & E2E tests
+test/: test/unit test/e2e
+
+.PHONY: coverage
+coverage: ##H@@ Combine all coverage data and show report
+ -coverage combine
+ coverage report
+
+
+REMOTE_HEAD ?= origin/master
+PY_CHANGED_FILES ?= $(shell git diff --name-only --diff-filter=MACU $(REMOTE_HEAD) '*.py')
+
+.PHONY: format
+format: ##H@@ Format with black & isort
+ isort ${PY_CHANGED_FILES}
+ black ${PY_CHANGED_FILES}
+ ruff check --fix --exit-zero ${PY_CHANGED_FILES}
+
+.PHONY: lint
+lint: ##H@@ Lint with flake8
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # x-fail as of Dec 2025
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ ruff check --exit-zero ${PY_CHANGED_FILES}
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # Disabled checks, for now
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # pylint ${PY_CHANGED_FILES}
+ # mypy ${PY_CHANGED_FILES}
+
+
+.PHONY: clean
+clean: ##H@@ Clean up build files/cache
+ rm -rf *.egg-info build dist .coverage
+ find . \( -name .venv -prune \) \
+ -o \( -name __pycache__ -o -name .mypy_cache -o -name .ruff_cache -o -name .pytest_cache \) \
+ -exec rm -rf {} +
# coding: utf-8
-from . import getmyancestors
-from . import mergemyancestors
-__version__ = "1.0.6"
+__version__ = "1.1.2"
-from getmyancestors import getmyancestors
+from getmyancestors import getmyanc
-getmyancestors.main()
+getmyanc.main()
MAX_PERSONS = 200
FACT_TAG_EVENT_TYPE = {
- 'BIRT': 'Birth',
- 'DEAT': 'Death',
- 'BURI': 'Burial',
- 'CREM': 'Cremation',
- 'NATU': 'Naturalization',
+ "BIRT": "Birth",
+ "DEAT": "Death",
+ "BURI": "Burial",
+ "CREM": "Cremation",
+ "NATU": "Naturalization",
}
FACT_TAGS = {
"NotNeeded": "INFANT",
}
+
# mergemyancestors constants and functions
-def reversed_dict(d):
+def reversed_dict(d: dict) -> dict:
return {val: key for key, val in d.items()}
# mergemyancestors classes
+from getmyancestors.classes.constants import FACT_TYPES, ORDINANCES
from getmyancestors.classes.tree import (
- Indi,
Fact,
Fam,
+ Indi,
Memorie,
Name,
Note,
Ordinance,
Source,
)
-from getmyancestors.classes.constants import FACT_TYPES, ORDINANCES
class Gedcom:
if self.tag == "DATE":
fact.date = self.__get_text()
elif self.tag == "PLAC":
- fact.place = self.__get_text()
+ fact.place = self.tree.ensure_place(self.__get_text())
elif self.tag == "MAP":
fact.map = self.__get_map()
elif self.tag == "NOTE":
# fstogedcom classes and functions
+import asyncio
import os
import re
-import time
-import asyncio
import tempfile
+import time
from threading import Thread
+from tkinter import IntVar, Menu, StringVar, TclError, filedialog, messagebox
+from tkinter.ttk import Button, Checkbutton, Entry, Frame, Label, Notebook, Treeview
+
from diskcache import Cache
-from tkinter import (
- StringVar,
- IntVar,
- filedialog,
- messagebox,
- Menu,
- TclError,
-)
-from tkinter.ttk import Frame, Label, Entry, Button, Checkbutton, Treeview, Notebook
-
-from getmyancestors.classes.tree import Indi, Fam, Tree
from getmyancestors.classes.gedcom import Gedcom
from getmyancestors.classes.session import Session
from getmyancestors.classes.translation import translations
+from getmyancestors.classes.tree import Fam, Indi, Tree
tmp_dir = os.path.join(tempfile.gettempdir(), "fstogedcom")
cache = Cache(tmp_dir)
self.save_password = IntVar()
self.save_password.set(cache.get("save_password") or 0)
- check_save_password = Checkbutton(self, text=_("Save Password"), variable=self.save_password, onvalue=1, offvalue=0)
+ check_save_password = Checkbutton(
+ self,
+ text=_("Save Password"),
+ variable=self.save_password,
+ onvalue=1,
+ offvalue=0,
+ )
label_username.grid(row=0, column=0, pady=15, padx=(0, 5))
entry_username.grid(row=0, column=1)
cache.add("save_password", save_pass)
url = "/service/tree/tree-data/reservations/person/%s/ordinances" % self.fs.fid
- lds_account = self.fs.get_url(url, {}).get("status") == "OK"
+ lds_account = self.fs.get_url(url, {}, no_api=True).get("status") == "OK"
self.options = Options(self.form, lds_account)
self.info("")
self.sign_in.destroy()
-# global imports
+import contextlib
+import json
+import os
+import sqlite3
import sys
import time
-from urllib.parse import urlparse, parse_qs
+import webbrowser
+from urllib.parse import parse_qs, urlparse
import requests
from requests_cache import CachedSession as CSession
-from fake_useragent import UserAgent
-
from requests_ratelimiter import LimiterAdapter
# local imports
from getmyancestors.classes.translation import translations
+DEFAULT_CLIENT_ID = "a02j000000KTRjpAAH"
+DEFAULT_REDIRECT_URI = "https://misbach.github.io/fs-auth/index_raw.html"
+
-# class Session(requests.Session):
class GMASession:
"""Create a FamilySearch session
:param username and password: valid FamilySearch credentials
:param timeout: time before retry a request
"""
- def __init__(self, username, password, verbose=False, logfile=False, timeout=60):
- # super().__init__('http_cache', backend='filesystem', expire_after=86400)
- # super().__init__()
+ def __init__(
+ self,
+ username,
+ password,
+ client_id=None,
+ redirect_uri=None,
+ verbose=False,
+ logfile=False,
+ timeout=60,
+ ):
self.username = username
self.password = password
+ self.client_id = client_id or DEFAULT_CLIENT_ID
+ self.redirect_uri = redirect_uri or DEFAULT_REDIRECT_URI
self.verbose = verbose
self.logfile = logfile
self.timeout = timeout
self.fid = self.lang = self.display_name = None
self.counter = 0
- self.headers = {"User-Agent": UserAgent().firefox}
+
+ # Persistence setup
+ os.makedirs("http_cache", exist_ok=True)
+ self.db_path = "http_cache/requests.sqlite"
+ self.cookie_file = os.path.expanduser("~/.getmyancestors_cookies.json")
+ self._init_db()
+
+ # Hardcode robust User-Agent to avoid bot detection
+ self.headers = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
+ "Accept-Language": "en-US,en;q=0.9",
+ }
# Apply a rate-limit (5 requests per second) to all requests
+ # Credit: Josemando Sobral
adapter = LimiterAdapter(per_second=5)
- self.mount('http://', adapter)
- self.mount('https://', adapter)
+ self.mount("http://", adapter)
+ self.mount("https://", adapter)
self.login()
+ def _init_db(self):
+ """Initialize SQLite database for session storage"""
+ with sqlite3.connect(self.db_path) as conn:
+ conn.execute(
+ "CREATE TABLE IF NOT EXISTS session (key TEXT PRIMARY KEY, value TEXT)"
+ )
+ conn.commit()
+
@property
def logged(self):
- return bool(self.cookies.get("fssessionid"))
+ return bool(
+ self.cookies.get("fssessionid") or self.headers.get("Authorization")
+ )
+
+ def save_cookies(self):
+ """save cookies and authorization header to SQLite"""
+ try:
+ data = {
+ "cookies": requests.utils.dict_from_cookiejar(self.cookies),
+ "auth": self.headers.get("Authorization"),
+ }
+ with sqlite3.connect(self.db_path) as conn:
+ conn.execute(
+ "REPLACE INTO session (key, value) VALUES ('current', ?)",
+ (json.dumps(data),),
+ )
+ conn.commit()
+
+ if self.verbose:
+ self.write_log("Session saved to SQLite: " + self.db_path)
+ except Exception as e:
+ self.write_log("Error saving session: " + str(e))
+
+ def load_cookies(self):
+ """load cookies and authorization header from SQLite or migrate from JSON"""
+ # 1. Try SQLite first
+ try:
+ with sqlite3.connect(self.db_path) as conn:
+ row = conn.execute(
+ "SELECT value FROM session WHERE key = 'current'"
+ ).fetchone()
+ if row:
+ data = json.loads(row[0])
+ self._apply_session_data(data)
+ if self.verbose:
+ self.write_log("Session loaded from SQLite")
+ return True
+ except Exception as e:
+ self.write_log("Error loading session from SQLite: " + str(e))
+
+ # 2. Migration from JSON if exists
+ if os.path.exists(self.cookie_file):
+ try:
+ with open(self.cookie_file, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ self._apply_session_data(data)
+ self.save_cookies() # Save to SQLite
+ os.rename(
+ self.cookie_file, self.cookie_file + ".bak"
+ ) # Backup and disable
+ if self.verbose:
+ self.write_log("Migrated session from JSON to SQLite")
+ return True
+ except Exception as e:
+ self.write_log("Error migrating session from JSON: " + str(e))
+
+ return False
+
+ def _apply_session_data(self, data):
+ """Internal helper to apply session dict to current session"""
+ if isinstance(data, dict) and ("cookies" in data or "auth" in data):
+ cookies_dict = data.get("cookies", {})
+ auth_header = data.get("auth")
+ else:
+ cookies_dict = data
+ auth_header = None
+
+ self.cookies.update(requests.utils.cookiejar_from_dict(cookies_dict))
+ if auth_header:
+ self.headers.update({"Authorization": auth_header})
def write_log(self, text):
"""write text in the log file"""
"""retrieve FamilySearch session ID
(https://familysearch.org/developers/docs/guides/oauth2)
"""
- while True:
+ if self.load_cookies():
+ if self.verbose:
+ self.write_log("Attempting to reuse cached session...")
+ # Use auto_login=False to prevent recursion if session is invalid
+ self.set_current(auto_login=False)
+ if self.logged and self.fid:
+ if self.verbose:
+ self.write_log("Successfully reused cached session.")
+ return
+ if self.verbose:
+ self.write_log("Cached session invalid or expired.")
+
+ # Define context manager for disabling cache
+ if hasattr(self, "cache_disabled"):
+ cache_context = self.cache_disabled()
+ else:
+ cache_context = contextlib.nullcontext()
+
+ with cache_context:
try:
+ if not self.username or not self.password:
+ return self.manual_login()
+
+ # Clear cookies to ensure fresh start for new login
+ self.cookies.clear()
+
url = "https://www.familysearch.org/auth/familysearch/login"
self.write_log("Downloading: " + url)
- self.get(url, headers=self.headers)
- xsrf = self.cookies["XSRF-TOKEN"]
+ self.get(url, headers=self.headers, timeout=self.timeout)
+ xsrf = self.cookies.get("XSRF-TOKEN")
+ if not xsrf:
+ self.write_log("No XSRF token found. Switching to manual login.")
+ return self.manual_login()
+
url = "https://ident.familysearch.org/login"
self.write_log("Downloading: " + url)
res = self.post(
"password": self.password,
},
headers=self.headers,
+ timeout=self.timeout,
)
+
try:
data = res.json()
except ValueError:
- self.write_log("Invalid auth request")
- self.write_log(res.headers)
- self.write_log(res.text)
-
- raise "Invalid auth request"
- # continue
- if "loginError" in data:
- self.write_log(data["loginError"])
- return
+ self.write_log(f"Headless Login Failed. Status: {res.status_code}")
+ self.write_log(f"Response Preview: {res.text[:200]}")
+ self.write_log("Switching to manual login.")
+ return self.manual_login()
+
if "redirectUrl" not in data:
- self.write_log(res.text)
- continue
+ self.write_log("Redirect URL not found in response.")
+ return self.manual_login()
url = data["redirectUrl"]
self.write_log("Downloading: " + url)
- res = self.get(url, headers=self.headers)
- res.raise_for_status()
+ self.get(url, headers=self.headers, timeout=self.timeout)
- url = f"https://ident.familysearch.org/cis-web/oauth2/v3/authorization?response_type=code&scope=openid profile email qualifies_for_affiliate_account country&client_id=a02j000000KTRjpAAH&redirect_uri=https://misbach.github.io/fs-auth/index_raw.html&username={self.username}"
+ url = f"https://ident.familysearch.org/cis-web/oauth2/v3/authorization?response_type=code&scope=openid profile email qualifies_for_affiliate_account country&client_id={self.client_id}&redirect_uri={self.redirect_uri}&username={self.username}"
self.write_log("Downloading: " + url)
- response = self.get(url, allow_redirects=False, headers=self.headers)
- location = response.headers["location"]
- code = parse_qs(urlparse(location).query).get("code")
+
+ # Allow redirects so we follow the chain to the callback URI
+ response = self.get(
+ url,
+ allow_redirects=True,
+ headers=self.headers,
+ timeout=self.timeout,
+ )
+
+ # Check if we landed on the redirect URI (or have the code in the URL)
+ final_url = response.url
+ code = None
+
+ if "code=" in final_url:
+ code = parse_qs(urlparse(final_url).query).get("code")
+
+ # If not in final URL, check history (in case of a meta refresh or stop)
+ if not code and response.history:
+ for resp in response.history:
+ if "code=" in resp.headers.get("Location", ""):
+ code = parse_qs(
+ urlparse(resp.headers["Location"]).query
+ ).get("code")
+ if code:
+ break
+
+ if not code:
+ self.write_log(f"Code not found in URL: {final_url}")
+ return self.manual_login(response.url)
+
+ if isinstance(code, list):
+ code = code[0]
+
+ # Use raw requests to avoid cache interference just in case
url = "https://ident.familysearch.org/cis-web/oauth2/v3/token"
self.write_log("Downloading: " + url)
- res = self.post(
+ res = requests.post(
url,
data={
"grant_type": "authorization_code",
- "client_id": "a02j000000KTRjpAAH",
+ "client_id": self.client_id,
"code": code,
- "redirect_uri": "https://misbach.github.io/fs-auth/index_raw.html",
+ "redirect_uri": self.redirect_uri,
},
headers=self.headers,
+ timeout=self.timeout,
)
- try:
- data = res.json()
- except ValueError:
- self.write_log("Invalid auth request")
- continue
+ data = res.json()
+ if "access_token" in data:
+ self.headers.update(
+ {"Authorization": f"Bearer {data['access_token']}"}
+ )
+ self.set_current(auto_login=False)
+ if self.logged:
+ self.save_cookies()
+ return
+ except Exception as e:
+ self.write_log("Headless login error: " + str(e))
+ return self.manual_login()
- if "access_token" not in data:
- self.write_log(res.text)
- continue
- access_token = data["access_token"]
- self.headers.update({"Authorization": f"Bearer {access_token}"})
+ def manual_login(self, auth_url=None):
+ """Perform manual login"""
+ if not auth_url:
+ auth_url = f"https://ident.familysearch.org/cis-web/oauth2/v3/authorization?response_type=code&scope=openid profile email qualifies_for_affiliate_account country&client_id={self.client_id}&redirect_uri={self.redirect_uri}&username={self.username}"
- except requests.exceptions.ReadTimeout:
- self.write_log("Read timed out")
- continue
- except requests.exceptions.ConnectionError:
- self.write_log("Connection aborted")
- time.sleep(self.timeout)
- continue
- except requests.exceptions.HTTPError:
- self.write_log("HTTPError")
- time.sleep(self.timeout)
- continue
- except KeyError:
- self.write_log("KeyError")
- time.sleep(self.timeout)
- continue
- except ValueError:
- self.write_log("ValueError")
- time.sleep(self.timeout)
- continue
- if self.logged:
- self.set_current()
- break
+ print("\n" + "=" * 60)
+ print("Headless login failed. Manual login required.")
+ print("=" * 60)
+ print(f"Opening browser to login: {auth_url}")
+
+ # Only open browser if we really are in a terminal context, but user asked to stop?
+ # We will open it because otherwise they can't login.
+ try:
+ webbrowser.open(auth_url)
+ except:
+ pass
+
+ print("\n" + "-" * 30)
+ print("MANUAL FALLBACK:")
+ print("1. Log in to FamilySearch in the opened window.")
+ print("2. Once logged in, you will be redirected.")
+ print(
+ "3. Copy the 'code' from the URL or simply copy the FULL destination URL."
+ )
+ print(
+ " (If it says 'code already used', assume you need to re-login or check for Access Token)"
+ )
+ print("-" * 30)
+
+ while True:
+ try:
+ import getpass
- def get_url(self, url, headers=None):
+ user_input = getpass.getpass(
+ "Paste the code, token, or full redirect URL here: "
+ ).strip()
+ if not user_input:
+ sys.exit(2)
+
+ code = None
+ session_id = None
+
+ # Check for Access Token first
+ if "access_token=" in user_input:
+ try:
+ parsed = urlparse(user_input)
+ if parsed.fragment:
+ qs = parse_qs(parsed.fragment)
+ if "access_token" in qs:
+ session_id = qs["access_token"][0]
+ if not session_id and parsed.query:
+ qs = parse_qs(parsed.query)
+ if "access_token" in qs:
+ session_id = qs["access_token"][0]
+ except:
+ pass
+
+ if (
+ not session_id
+ and len(user_input) > 50
+ and "=" not in user_input
+ and "http" not in user_input
+ ):
+ session_id = user_input
+
+ if session_id:
+ self.headers.update({"Authorization": f"Bearer {session_id}"})
+ self.cookies.set(
+ "fssessionid", session_id, domain=".familysearch.org"
+ )
+ self.set_current(auto_login=False)
+ if self.logged and self.fid:
+ self.save_cookies()
+ print("\nSuccess! Session established via Token.")
+ return
+ else:
+ print("\nToken appeared invalid. Try again.")
+ continue
+
+ # Check for Code
+ if "code=" in user_input:
+ try:
+ parsed = urlparse(user_input)
+ qs = parse_qs(parsed.query)
+ if "code" in qs:
+ code = qs["code"][0]
+ except:
+ pass
+ elif len(user_input) < 50:
+ code = user_input
+
+ if code:
+ url = "https://ident.familysearch.org/cis-web/oauth2/v3/token"
+ try:
+ # Raw request to avoid cache
+ res = requests.post(
+ url,
+ data={
+ "grant_type": "authorization_code",
+ "client_id": self.client_id,
+ "code": code,
+ "redirect_uri": self.redirect_uri,
+ },
+ headers=self.headers,
+ timeout=self.timeout,
+ )
+
+ data = res.json()
+ if "access_token" in data:
+ session_id = data["access_token"]
+ self.headers.update(
+ {"Authorization": f"Bearer {session_id}"}
+ )
+ self.cookies.set(
+ "fssessionid", session_id, domain=".familysearch.org"
+ )
+ self.set_current(auto_login=False)
+ if self.logged and self.fid:
+ self.save_cookies()
+ print("\nSuccess! Session established via Code.")
+ return
+
+ error_desc = data.get(
+ "error_description", data.get("error", "Unknown error")
+ )
+ print(f"\nToken exchange failed: {error_desc}")
+
+ except Exception as e:
+ print(f"\nError during token exchange: {e}")
+
+ print("Invalid input or failed login. Please try again.")
+
+ except (EOFError, KeyboardInterrupt):
+ print("\nLogin cancelled.")
+ sys.exit(2)
+
+ def get_url(self, url, headers=None, auto_login=True):
"""retrieve JSON structure from a FamilySearch URL"""
self.counter += 1
if headers is None:
while True:
try:
self.write_log("Downloading: " + url)
+ # Used HEAD logic here (explicit API URL)
r = self.get(
"https://api.familysearch.org" + url,
timeout=self.timeout,
time.sleep(self.timeout)
continue
self.write_log("Status code: %s" % r.status_code)
+ if self.verbose and hasattr(r, "from_cache") and r.from_cache:
+ self.write_log("CACHE HIT: " + url)
if r.status_code == 204:
return None
if r.status_code in {404, 405, 410, 500}:
self.write_log("WARNING: " + url)
return None
if r.status_code == 401:
- self.login()
- continue
+ if auto_login:
+ self.login()
+ continue
+ else:
+ return None
try:
r.raise_for_status()
except requests.exceptions.HTTPError:
self.write_log("WARNING: corrupted file from %s, error: %s" % (url, e))
return None
- def set_current(self):
+ def set_current(self, auto_login=True):
"""retrieve FamilySearch current user ID, name and language"""
url = "/platform/users/current"
- data = self.get_url(url)
+ data = self.get_url(url, auto_login=auto_login)
if data:
self.fid = data["users"][0]["personId"]
self.lang = data["users"][0]["preferredLanguage"]
self.display_name = data["users"][0]["displayName"]
def _(self, string):
- """translate a string into user's language
- TODO replace translation file for gettext format
- """
+ """translate a string into user's language"""
if string in translations and self.lang in translations[string]:
return translations[string][self.lang]
return string
class CachedSession(GMASession, CSession):
+ def __init__(
+ self,
+ username,
+ password,
+ client_id=None,
+ redirect_uri=None,
+ verbose=False,
+ logfile=False,
+ timeout=60,
+ cache_control=True,
+ ):
+ # Persistence setup
+ os.makedirs("http_cache", exist_ok=True)
+ # Use SQLite backend as per requirement
+ CSession.__init__(
+ self,
+ "http_cache/requests",
+ backend="sqlite",
+ expire_after=86400,
+ allowable_codes=(200, 204),
+ table_name="responses",
+ cache_control=cache_control, # Enable HTTP conditional requests (ETag/Last-Modified)
+ )
+ GMASession.__init__(
+ self,
+ username,
+ password,
+ client_id,
+ redirect_uri,
+ verbose=verbose,
+ logfile=logfile,
+ timeout=timeout,
+ )
- def __init__(self, username, password, verbose=False, logfile=False, timeout=60):
- CSession.__init__(self, 'http_cache', backend='filesystem', expire_after=86400)
- GMASession.__init__(self, username, password, verbose=verbose, logfile=logfile, timeout=timeout)
-class Session(GMASession, requests.Session):
- def __init__(self, username, password, verbose=False, logfile=False, timeout=60):
+class Session(GMASession, requests.Session):
+ def __init__(
+ self,
+ username,
+ password,
+ client_id=None,
+ redirect_uri=None,
+ verbose=False,
+ logfile=False,
+ timeout=60,
+ cache_control=True, # Ignored for non-cached sessions
+ ):
requests.Session.__init__(self)
- GMASession.__init__(self, username, password, verbose=verbose, logfile=logfile, timeout=timeout)
+ GMASession.__init__(
+ self,
+ username,
+ password,
+ client_id,
+ redirect_uri,
+ verbose=verbose,
+ logfile=logfile,
+ timeout=timeout,
+ )
"Cut": {"fr": "Couper"},
"Paste": {"fr": "Coller"},
"Username:": {
- "fr": "Nom d'utilisateur :",
- "de": "Benutzername:",
+ "fr": "Nom d'utilisateur :",
+ "de": "Benutzername:",
},
"Password:": {
- "fr": "Mot de passe :",
- "de": "Passwort:",
+ "fr": "Mot de passe :",
+ "de": "Passwort:",
},
"Save Password": {
- "fr": "Enregistrer le mot de passe",
- "de": "Passwort speichern",
+ "fr": "Enregistrer le mot de passe",
+ "de": "Passwort speichern",
},
"ID already exist": {"fr": "Cet identifiant existe déjà"},
"Invalid FamilySearch ID: ": {"fr": "Identifiant FamilySearch invalide : "},
-import sys
-import re
-import time
import asyncio
import os
-from urllib.parse import unquote, unquote_plus
+import re
+import sys
+import time
+import xml.etree.cElementTree as ET
from datetime import datetime
-from typing import Set, Dict, List, Tuple, Union, Optional, BinaryIO, Any
+from typing import Any, BinaryIO, Dict, List, Optional, Set, Tuple
+from urllib.parse import unquote, unquote_plus
+from xml.etree.cElementTree import Element
+
# global imports
import babelfish
import geocoder
import requests
-import xml.etree.cElementTree as ET
-from xml.etree.cElementTree import Element
from requests_cache import CachedSession
# local imports
-import getmyancestors
+from getmyancestors import __version__
from getmyancestors.classes.constants import (
- MAX_PERSONS,
FACT_EVEN,
FACT_TAGS,
+ MAX_PERSONS,
ORDINANCES_STATUS,
)
-
-COUNTY = 'County'
-COUNTRY = 'Country'
-CITY = 'City'
+COUNTY = "County"
+COUNTRY = "Country"
+CITY = "City"
GEONAME_FEATURE_MAP = {
- 'ADM1': COUNTY, # first-order administrative division a primary administrative division of a country, such as a state in the United States
- 'ADM1H': COUNTY, # historical first-order administrative division a former first-order administrative division
- 'ADM2': COUNTY, # second-order administrative division a subdivision of a first-order administrative division
- 'ADM2H': COUNTY, # historical second-order administrative division a former second-order administrative division
- 'ADM3': COUNTY, # third-order administrative division a subdivision of a second-order administrative division
- 'ADM3H': COUNTY, # historical third-order administrative division a former third-order administrative division
- 'ADM4': COUNTY, # fourth-order administrative division a subdivision of a third-order administrative division
- 'ADM4H': COUNTY, # historical fourth-order administrative division a former fourth-order administrative division
- 'ADM5': COUNTY, # fifth-order administrative division a subdivision of a fourth-order administrative division
- 'ADM5H': COUNTY, # historical fifth-order administrative division a former fifth-order administrative division
- 'ADMD': COUNTY, # administrative division an administrative division of a country, undifferentiated as to administrative level
- 'ADMDH': COUNTY, # historical administrative division a former administrative division of a political entity, undifferentiated as to administrative level
+ "ADM1": COUNTY, # first-order administrative division a primary administrative division of a country, such as a state in the United States
+ "ADM1H": COUNTY, # historical first-order administrative division a former first-order administrative division
+ "ADM2": COUNTY, # second-order administrative division a subdivision of a first-order administrative division
+ "ADM2H": COUNTY, # historical second-order administrative division a former second-order administrative division
+ "ADM3": COUNTY, # third-order administrative division a subdivision of a second-order administrative division
+ "ADM3H": COUNTY, # historical third-order administrative division a former third-order administrative division
+ "ADM4": COUNTY, # fourth-order administrative division a subdivision of a third-order administrative division
+ "ADM4H": COUNTY, # historical fourth-order administrative division a former fourth-order administrative division
+ "ADM5": COUNTY, # fifth-order administrative division a subdivision of a fourth-order administrative division
+ "ADM5H": COUNTY, # historical fifth-order administrative division a former fifth-order administrative division
+ "ADMD": COUNTY, # administrative division an administrative division of a country, undifferentiated as to administrative level
+ "ADMDH": COUNTY, # historical administrative division a former administrative division of a political entity, undifferentiated as to administrative level
# 'LTER': leased area a tract of land leased to another country, usually for military installations
- 'PCL': COUNTRY, # political entity
- 'PCLD': COUNTRY, # dependent political entity
- 'PCLF': COUNTRY, # freely associated state
- 'PCLH': COUNTRY, # historical political entity a former political entity
- 'PCLI': COUNTRY, # independent political entity
- 'PCLIX': COUNTRY, # section of independent political entity
- 'PCLS': COUNTRY, # semi-independent political entity
-
- 'PPL': CITY, # populated place a city, town, village, or other agglomeration of buildings where people live and work
- 'PPLA': CITY, # seat of a first-order administrative division seat of a first-order administrative division (PPLC takes precedence over PPLA)
- 'PPLA2': CITY, # seat of a second-order administrative division
- 'PPLA3': CITY, # seat of a third-order administrative division
- 'PPLA4': CITY, # seat of a fourth-order administrative division
- 'PPLA5': CITY, # seat of a fifth-order administrative division
- 'PPLC': CITY, # capital of a political entity
- 'PPLCH': CITY, # historical capital of a political entity a former capital of a political entity
- 'PPLF': CITY, # farm village a populated place where the population is largely engaged in agricultural activities
- 'PPLG': CITY, # seat of government of a political entity
- 'PPLH': CITY, # historical populated place a populated place that no longer exists
- 'PPLL': CITY, # populated locality an area similar to a locality but with a small group of dwellings or other buildings
- 'PPLQ': CITY, # abandoned populated place
- 'PPLR': CITY, # religious populated place a populated place whose population is largely engaged in religious occupations
- 'PPLS': CITY, # populated places cities, towns, villages, or other agglomerations of buildings where people live and work
- 'PPLW': CITY, # destroyed populated place a village, town or city destroyed by a natural disaster, or by war
- 'PPLX': CITY, # section of populated place
-
+ "PCL": COUNTRY, # political entity
+ "PCLD": COUNTRY, # dependent political entity
+ "PCLF": COUNTRY, # freely associated state
+ "PCLH": COUNTRY, # historical political entity a former political entity
+ "PCLI": COUNTRY, # independent political entity
+ "PCLIX": COUNTRY, # section of independent political entity
+ "PCLS": COUNTRY, # semi-independent political entity
+ "PPL": CITY, # populated place a city, town, village, or other agglomeration of buildings where people live and work
+ "PPLA": CITY, # seat of a first-order administrative division seat of a first-order administrative division (PPLC takes precedence over PPLA)
+ "PPLA2": CITY, # seat of a second-order administrative division
+ "PPLA3": CITY, # seat of a third-order administrative division
+ "PPLA4": CITY, # seat of a fourth-order administrative division
+ "PPLA5": CITY, # seat of a fifth-order administrative division
+ "PPLC": CITY, # capital of a political entity
+ "PPLCH": CITY, # historical capital of a political entity a former capital of a political entity
+ "PPLF": CITY, # farm village a populated place where the population is largely engaged in agricultural activities
+ "PPLG": CITY, # seat of government of a political entity
+ "PPLH": CITY, # historical populated place a populated place that no longer exists
+ "PPLL": CITY, # populated locality an area similar to a locality but with a small group of dwellings or other buildings
+ "PPLQ": CITY, # abandoned populated place
+ "PPLR": CITY, # religious populated place a populated place whose population is largely engaged in religious occupations
+ "PPLS": CITY, # populated places cities, towns, villages, or other agglomerations of buildings where people live and work
+ "PPLW": CITY, # destroyed populated place a village, town or city destroyed by a natural disaster, or by war
+ "PPLX": CITY, # section of populated place
}
+
# getmyancestors classes and functions
def cont(string):
"""parse a GEDCOM line adding CONT and CONT tags if necessary"""
max_len = 248
return ("\n%s CONT " % level).join(res) + "\n"
+
class Note:
"""GEDCOM Note class
:param text: the Note content
def __init__(self, text="", tree=None, num=None, num_prefix=None, note_type=None):
self._handle = None
- self.note_type = note_type or 'Source Note'
+ self.note_type = note_type or "Source Note"
self.num_prefix = num_prefix
if num:
self.num = num
else:
- Note.counter[num_prefix or 'None'] = Note.counter.get(num_prefix or 'None', 0) + 1
- self.num = Note.counter[num_prefix or 'None']
- print(f'##### Creating Note: {num_prefix}, {self.num}', file=sys.stderr)
+ Note.counter[num_prefix or "None"] = (
+ Note.counter.get(num_prefix or "None", 0) + 1
+ )
+ self.num = Note.counter[num_prefix or "None"]
+ print(f"##### Creating Note: {num_prefix}, {self.num}", file=sys.stderr)
self.text = text.strip()
if tree:
tree.notes.append(self)
+ def __str__(self):
+ """Return readable string for debugging/reference purposes."""
+ return f"{self.num}. {self.text}"
+
@property
def id(self):
- return f'{self.num_prefix}_{self.num}' if self.num_prefix != None else self.num
+ return (
+ f"{self.num_prefix}_{self.num}" if self.num_prefix is not None else self.num
+ )
def print(self, file=sys.stdout):
"""print Note in GEDCOM format"""
- print(f'Note: {self.text}', file=sys.stderr)
+ print(f"Note: {self.text}", file=sys.stderr)
file.write(cont("0 @N%s@ NOTE %s" % (self.id, self.text)))
def link(self, file=sys.stdout, level=1):
"""print the reference in GEDCOM format"""
- print(f'Linking Note: {self.id}', file=sys.stderr)
+ print(f"Linking Note: {self.id}", file=sys.stderr)
file.write("%s NOTE @N%s@\n" % (level, self.id))
-
@property
def handle(self):
if not self._handle:
- self._handle = '_' + os.urandom(10).hex()
+ self._handle = "_" + os.urandom(10).hex()
return self._handle
def printxml(self, parent_element: Element) -> None:
note_element = ET.SubElement(
parent_element,
- 'note',
+ "note",
handle=self.handle,
- # change='1720382308',
- id=self.id,
- type='Source Note'
+ # change='1720382308',
+ id=self.id,
+ type="Source Note",
)
- ET.SubElement(note_element, 'text').text = self.text
+ ET.SubElement(note_element, "text").text = self.text
+
class Source:
"""GEDCOM Source class
if "titles" in data:
self.title = data["titles"][0]["value"]
if "notes" in data:
- notes = [ n['text'] for n in data["notes"] if n["text"] ]
+ notes = [n["text"] for n in data["notes"] if n["text"]]
for idx, n in enumerate(notes):
- self.notes.add(Note(
- n,
- self.tree,
- num="S%s-%s" % (self.id, idx),
- note_type='Source Note'
- ))
- self.modified = data['attribution']['modified']
+ self.notes.add(
+ Note(
+ n,
+ self.tree,
+ num="S%s-%s" % (self.id, idx),
+ note_type="Source Note",
+ )
+ )
+ self.modified = data["attribution"]["modified"]
+
+ def __str__(self):
+ """Return readable string for debugging/reference purposes."""
+ return f"{self.num}. {self.title}"
@property
def id(self):
- return 'S' + str(self.fid or self.num)
-
+ return "S" + str(self.fid or self.num)
@property
def handle(self):
if not self._handle:
- self._handle = '_' + os.urandom(10).hex()
+ self._handle = "_" + os.urandom(10).hex()
return self._handle
file.write("%s SOUR @S%s@\n" % (level, self.id))
def printxml(self, parent_element: Element) -> None:
-
- # <source handle="_fa593c277b471380bbcc5282e8f" change="1720382301" id="SQ8M5-NSP">
- # <stitle>Palkovics Cser József, "Hungary Civil Registration, 1895-1980"</stitle>
- # <sauthor>"Hungary Civil Registration, 1895-1980", , <i>FamilySearch</i> (https://www.familysearch.org/ark:/61903/1:1:6JBQ-NKWD : Thu Mar 07 10:23:43 UTC 2024), Entry for Palkovics Cser József and Palkovics Cser István, 27 Aug 1928.</sauthor>
- # <spubinfo>https://familysearch.org/ark:/61903/1:1:6JBQ-NKWD</spubinfo>
- # <srcattribute type="REFN" value="Q8M5-NSP"/>
- # </source>
+ # <source handle="_fa593c277b471380bbcc5282e8f" change="1720382301" id="SQ8M5-NSP">
+ # <stitle>Palkovics Cser József, "Hungary Civil Registration, 1895-1980"</stitle>
+ # <sauthor>"Hungary Civil Registration, 1895-1980", , <i>FamilySearch</i> (https://www.familysearch.org/ark:/61903/1:1:6JBQ-NKWD : Thu Mar 07 10:23:43 UTC 2024), Entry for Palkovics Cser József and Palkovics Cser István, 27 Aug 1928.</sauthor>
+ # <spubinfo>https://familysearch.org/ark:/61903/1:1:6JBQ-NKWD</spubinfo>
+ # <srcattribute type="REFN" value="Q8M5-NSP"/>
+ # </source>
source_element = ET.SubElement(
parent_element,
- 'source',
+ "source",
handle=self.handle,
change=str(int(self.modified / 1000)),
- id=self.id
+ id=self.id,
)
if self.title:
- ET.SubElement(source_element, 'stitle').text = self.title
+ ET.SubElement(source_element, "stitle").text = self.title
if self.citation:
- ET.SubElement(source_element, 'sauthor').text = self.citation
+ ET.SubElement(source_element, "sauthor").text = self.citation
if self.url:
- ET.SubElement(source_element, 'spubinfo').text = self.url
+ ET.SubElement(source_element, "spubinfo").text = self.url
if self.fid:
- ET.SubElement(source_element, 'srcattribute', type='REFN', value=self.fid)
+ ET.SubElement(source_element, "srcattribute", type="REFN", value=self.fid)
class Fact:
counter = {}
- def __init__(self, data=None, tree: Optional['Tree']=None, num_prefix=None):
+ def __init__(self, data=None, tree: Optional["Tree"] = None, num_prefix=None):
self.value = self.type = self.date = None
self.date_type = None
self.place: Optional[Place] = None
self.note = None
+ self.map = None
self._handle: Optional[str] = None
if data:
if "value" in data:
elif self.type not in FACT_TAGS:
self.type = None
-
- self.num_prefix = f'{num_prefix}_{FACT_TAGS[self.type]}' if num_prefix and self.type in FACT_TAGS else num_prefix
- Fact.counter[self.num_prefix or 'None'] = Fact.counter.get(self.num_prefix or 'None', 0) + 1
- self.num = Fact.counter[self.num_prefix or 'None']
+ self.num_prefix = (
+ f"{num_prefix}_{FACT_TAGS[self.type]}"
+ if num_prefix and self.type in FACT_TAGS
+ else num_prefix
+ )
+ Fact.counter[self.num_prefix or "None"] = (
+ Fact.counter.get(self.num_prefix or "None", 0) + 1
+ )
+ self.num = Fact.counter[self.num_prefix or "None"]
if data:
if "date" in data:
- if 'formal' in data['date']:
- self.date = data['date']['formal'].split('+')[-1].split('/')[0]
- if data['date']['formal'].startswith('A+'):
- self.date_type = 'about'
- if data['date']['formal'].startswith('/+'):
- self.date_type = 'before'
- if data['date']['formal'].endswith('/'):
- self.date_type = 'after'
+ if "formal" in data["date"]:
+ self.date = data["date"]["formal"].split("+")[-1].split("/")[0]
+ if data["date"]["formal"].startswith("A+"):
+ self.date_type = "about"
+ if data["date"]["formal"].startswith("/+"):
+ self.date_type = "before"
+ if data["date"]["formal"].endswith("/"):
+ self.date_type = "after"
else:
self.date = data["date"]["original"]
if "place" in data:
place = data["place"]
place_name = place["original"]
- place_id = place["description"][1:] if "description" in place and place["description"][1:] in tree.places else None
+ place_id = (
+ place["description"][1:]
+ if "description" in place
+ and place["description"][1:] in tree.places
+ else None
+ )
self.place = tree.ensure_place(place_name, place_id)
if "changeMessage" in data["attribution"]:
self.note = Note(
- data["attribution"]["changeMessage"],
+ data["attribution"]["changeMessage"],
tree,
- num_prefix='E' + self.num_prefix if self.num_prefix else None,
- note_type='Event Note',
+ num_prefix="E" + self.num_prefix if self.num_prefix else None,
+ note_type="Event Note",
)
if self.type == "http://gedcomx.org/Death" and not (
self.date or self.place
if tree:
tree.facts.add(self)
-
@property
def id(self):
- return f'{self.num_prefix}_{self.num}' if self.num_prefix != None else self.num
-
+ return (
+ f"{self.num_prefix}_{self.num}" if self.num_prefix is not None else self.num
+ )
@property
def handle(self):
if not self._handle:
- self._handle = '_' + os.urandom(10).hex()
+ self._handle = "_" + os.urandom(10).hex()
return self._handle
def printxml(self, parent_element):
-
event_element = ET.SubElement(
parent_element,
- 'event',
+ "event",
handle=self.handle,
# change='1720382301',
- id=self.id
+ id=self.id,
)
- ET.SubElement(event_element, 'type').text = (
- unquote_plus(self.type[len('http://gedcomx.org/'):])
- if self.type.startswith('http://gedcomx.org/')
+ ET.SubElement(event_element, "type").text = (
+ unquote_plus(self.type[len("http://gedcomx.org/") :])
+ if self.type.startswith("http://gedcomx.org/")
else self.type
)
# FACT_TAGS.get(self.type, self.type)
if self.date:
- params={
- 'val': self.date,
+ params = {
+ "val": self.date,
}
if self.date_type is not None:
- params['type'] = self.date_type
- ET.SubElement(event_element, 'datestr', **params)
+ params["type"] = self.date_type
+ ET.SubElement(event_element, "datestr", **params)
if self.place:
- ET.SubElement(event_element, 'place', hlink=self.place.handle)
+ ET.SubElement(event_element, "place", hlink=self.place.handle)
if self.note:
- ET.SubElement(event_element, 'noteref', hlink=self.note.handle)
+ ET.SubElement(event_element, "noteref", hlink=self.note.handle)
def print(self, file=sys.stdout):
"""print Fact in GEDCOM format
NAME_MAP = {
- "preferred" : 'Preeferred Name',
- "nickname" : 'Nickname',
- "birthname": 'Birth Name',
- "aka": 'Also Known As',
- "married": 'Married Name',
+ "preferred": "Preeferred Name",
+ "nickname": "Nickname",
+ "birthname": "Birth Name",
+ "aka": "Also Known As",
+ "married": "Married Name",
}
+
class Name:
"""GEDCOM Name class
:param data: FS Name data
:param tree: a Tree object
"""
- def __init__(self, data=None, tree=None, owner_fis=None, kind=None, alternative: bool=False):
+ def __init__(
+ self, data=None, tree=None, owner_fis=None, kind=None, alternative: bool = False
+ ):
self.given = ""
self.surname = ""
self.prefix = None
self.note = Note(
data["attribution"]["changeMessage"],
tree,
- num_prefix=f'NAME_{owner_fis}_{kind}',
- note_type='Name Note',
+ note_type="Name Note",
)
+ def __str__(self):
+ """Return readable string for debugging/reference purposes."""
+ return f"{self.given} {self.surname}"
+
def printxml(self, parent_element):
params = {}
if self.kind is not None:
- params['type'] = NAME_MAP.get(self.kind, self.kind)
+ params["type"] = NAME_MAP.get(self.kind, self.kind)
if self.alternative:
- params['alt'] = '1'
- person_name = ET.SubElement(parent_element, 'name', **params)
- ET.SubElement(person_name, 'first').text = self.given
- ET.SubElement(person_name, 'surname').text = self.surname
+ params["alt"] = "1"
+ person_name = ET.SubElement(parent_element, "name", **params)
+ ET.SubElement(person_name, "first").text = self.given
+ ET.SubElement(person_name, "surname").text = self.surname
# TODO prefix / suffix
-
def print(self, file=sys.stdout, typ=None):
"""print Name in GEDCOM format
:param typ: type for additional names
self.note.link(file, 2)
-
class Place:
"""GEDCOM Place class
:param name: the place name
counter = 0
def __init__(
- self,
- id: str,
- name: str,
- type: Optional[str]=None,
- parent: Optional['Place']=None,
- latitude: Optional[float]=None,
- longitude: Optional[float]=None):
+ self,
+ id: str,
+ name: str,
+ type: Optional[str] = None,
+ parent: Optional["Place"] = None,
+ latitude: Optional[float] = None,
+ longitude: Optional[float] = None,
+ ):
self._handle = None
self.name = name
self.type = type
@property
def handle(self):
if not self._handle:
- self._handle = '_' + os.urandom(10).hex()
+ self._handle = "_" + os.urandom(10).hex()
return self._handle
-
def print(self, file=sys.stdout, indentation=0):
"""print Place in GEDCOM format"""
- file.write("%d @P%s@ PLAC %s\n" % (indentation, self.num, self.name))
+ file.write("%d @P%s@ PLAC %s\n" % (indentation, self.id, self.name))
def printxml(self, parent_element):
-
-
- # <placeobj handle="_fac310617a8744e1d62f3d0dafe" change="1723223127" id="P0000" type="Country">
- # <pname value="Magyarország"/>
- # </placeobj>
- # <placeobj handle="_fac310962e15149e8244c2ccade" change="1723223149" id="P0001" type="County">
- # <pname value="Fejér"/>
- # <placeref hlink="_fac310617a8744e1d62f3d0dafe"/>
- # </placeobj>
+ # <placeobj handle="_fac310617a8744e1d62f3d0dafe" change="1723223127" id="P0000" type="Country">
+ # <pname value="Magyarország"/>
+ # </placeobj>
+ # <placeobj handle="_fac310962e15149e8244c2ccade" change="1723223149" id="P0001" type="County">
+ # <pname value="Fejér"/>
+ # <placeref hlink="_fac310617a8744e1d62f3d0dafe"/>
+ # </placeobj>
place_element = ET.SubElement(
- parent_element,
- 'placeobj',
+ parent_element,
+ "placeobj",
handle=self.handle,
# change='1720382307',
id=self.id,
- type=self.type or 'Unknown'
+ type=self.type or "Unknown",
)
# ET.SubElement(place_element, 'ptitle').text = self.name
- ET.SubElement(place_element, 'pname', value=self.name)
+ ET.SubElement(place_element, "pname", value=self.name)
if self.parent:
- ET.SubElement(place_element, 'placeref', hlink=self.parent.handle)
+ ET.SubElement(place_element, "placeref", hlink=self.parent.handle)
if self.latitude and self.longitude:
- ET.SubElement(place_element, 'coord', long=str(self.longitude), lat=str(self.latitude))
+ ET.SubElement(
+ place_element, "coord", long=str(self.longitude), lat=str(self.latitude)
+ )
+
class Ordinance:
"""GEDCOM Ordinance class
if self.famc:
file.write("2 FAMC @F%s@\n" % self.famc.num)
-class Citation:
+class Citation:
def __init__(self, data: Dict[str, Any], source: Source):
self._handle = None
self.id = data["id"]
else None
)
# TODO create citation note out of this.
- self.modified = data['attribution']['modified']
+ self.modified = data["attribution"]["modified"]
-
@property
def handle(self):
if not self._handle:
- self._handle = '_' + os.urandom(10).hex()
+ self._handle = "_" + os.urandom(10).hex()
return self._handle
def printxml(self, parent_element: Element):
-
-# <citation handle="_fac4a72a01b1681293ea1ee8d9" change="1723265781" id="C0000">
-# <dateval val="1998-05-03"/>
-# <confidence>2</confidence>
-# <noteref hlink="_fac4a71ac2c6c5749abd6a0bd72"/>
-# <sourceref hlink="_fac4a70566329a02afcc10731f5"/>
-# </citation>
+ # <citation handle="_fac4a72a01b1681293ea1ee8d9" change="1723265781" id="C0000">
+ # <dateval val="1998-05-03"/>
+ # <confidence>2</confidence>
+ # <noteref hlink="_fac4a71ac2c6c5749abd6a0bd72"/>
+ # <sourceref hlink="_fac4a70566329a02afcc10731f5"/>
+ # </citation>
citation_element = ET.SubElement(
parent_element,
- 'citation',
+ "citation",
handle=self.handle,
change=str(int(self.modified / 1000)),
- id='C' + str(self.id)
+ id="C" + str(self.id),
)
- ET.SubElement(citation_element, 'confidence').text = '2'
- ET.SubElement(citation_element, 'sourceref', hlink=self.source.handle)
+ ET.SubElement(citation_element, "confidence").text = "2"
+ ET.SubElement(citation_element, "sourceref", hlink=self.source.handle)
class Indi:
counter = 0
- def __init__(self, fid: str, tree: 'Tree', num=None):
+ def __init__(self, fid: Optional[str] = None, tree: "Tree" = None, num=None):
self._handle = None
if num:
self.num = num
self.num = Indi.counter
self.fid = fid
self.tree = tree
- self.famc: Set['Fam'] = set()
- self.fams: Set['Fam'] = set()
- # self.famc_fid = set()
- # self.fams_fid = set()
- # self.famc_num = set()
- # self.fams_num = set()
- # self.famc_ids = set()
- # self.fams_ids = set()
+ self.famc: Set["Fam"] = set()
+ self.fams: Set["Fam"] = set()
+ self.famc_fid = set()
+ self.fams_fid = set()
+ self.famc_num = set()
+ self.fams_num = set()
+ self.famc_ids = set()
+ self.fams_ids = set()
self.name: Optional[Name] = None
self.gender = None
self.living = None
- self.parents: Set[Tuple[str, str]] = set() # (father_id, mother_id)
- self.spouses: Set[Tuple[str, str, str]] = set() # (person1, person2, relfid)
- self.children: Set[Tuple[str, str, str]] = set() # (father_id, mother_id, child_id)
+ self.parents: Set[Tuple[str, str]] = set() # (father_id, mother_id)
+ self.spouses: Set[Tuple[str, str, str]] = set() # (person1, person2, relfid)
+ self.children: Set[Tuple[str, str, str]] = (
+ set()
+ ) # (father_id, mother_id, child_id)
self.baptism = self.confirmation = self.initiatory = None
self.endowment = self.sealing_child = None
self.nicknames: Set[Name] = set()
self.aka: Set[Name] = set()
self.facts: Set[Fact] = set()
self.notes: Set[Note] = set()
- # self.sources: Set[Source] = set()
+ self.sources = set()
self.citations: Set[Citation] = set()
self.memories = set()
+ def __str__(self):
+ """Return readable string for debugging/reference purposes."""
+ return f"{self.num}. {self.name}, fam: {self.fid}"
+
def add_data(self, data):
"""add FS individual data"""
if data:
self.living = data["living"]
for x in data["names"]:
- alt = not x.get('preferred', False)
+ alt = not x.get("preferred", False)
if x["type"] == "http://gedcomx.org/Nickname":
self.nicknames.add(Name(x, self.tree, self.fid, "nickname", alt))
elif x["type"] == "http://gedcomx.org/BirthName":
elif x["type"] == "http://gedcomx.org/MarriedName":
self.married.add(Name(x, self.tree, self.fid, "married", alt))
else:
- print('Unknown name type: ' + x.get('type'), file=sys.stderr)
- raise 'Unknown name type'
+ print("Unknown name type: " + x.get("type"), file=sys.stderr)
+ raise "Unknown name type"
if "gender" in data:
if data["gender"]["type"] == "http://gedcomx.org/Male":
self.gender = "M"
"=== %s ===\n%s"
% (self.tree.fs._("Life Sketch"), x.get("value", "")),
self.tree,
- num_prefix=f'INDI_{self.fid}',
- note_type='Person Note',
+ num_prefix=f"INDI_{self.fid}",
+ note_type="Person Note",
)
)
else:
- self.facts.add(Fact(x, self.tree, num_prefix=f'INDI_{self.fid}'))
+ self.facts.add(
+ Fact(x, self.tree, num_prefix=f"INDI_{self.fid}")
+ )
if "sources" in data:
sources = self.tree.fs.get_url(
"/platform/tree/persons/%s/sources" % self.fid
)
if sources:
- quotes = dict()
for quote in sources["persons"][0]["sources"]:
source_id = quote["descriptionId"]
source_data = next(
- (s for s in sources['sourceDescriptions'] if s['id'] == source_id),
+ (
+ s
+ for s in sources["sourceDescriptions"]
+ if s["id"] == source_id
+ ),
None,
)
source = self.tree.ensure_source(source_data)
if source:
citation = self.tree.ensure_citation(quote, source)
self.citations.add(citation)
+ self.sources.add((source, citation.message))
for evidence in data.get("evidence", []):
memory_id, *_ = evidence["id"].partition("-")
Note(
text,
self.tree,
- num_prefix=f'INDI_{self.fid}',
- note_type='Person Note',
- ))
+ num_prefix=f"INDI_{self.fid}",
+ note_type="Person Note",
+ )
+ )
else:
self.memories.add(Memorie(x))
- def add_fams(self, fam: 'Fam'):
+ def add_fams(self, fam: "Fam"):
"""add family fid (for spouse or parent)"""
self.fams.add(fam)
- def add_famc(self, fam: 'Fam'):
+ def add_famc(self, fam: "Fam"):
"""add family fid (for child)"""
self.famc.add(fam)
def get_notes(self):
"""retrieve individual notes"""
- print(f'Getting Notes for {self.fid}', file=sys.stderr)
+ print(f"Getting Notes for {self.fid}", file=sys.stderr)
notes = self.tree.fs.get_url("/platform/tree/persons/%s/notes" % self.fid)
if notes:
for n in notes["persons"][0]["notes"]:
Note(
text_note,
self.tree,
- num_prefix=f'INDI_{self.fid}',
- note_type='Person Note',
- ))
+ num_prefix=f"INDI_{self.fid}",
+ note_type="Person Note",
+ )
+ )
def get_ordinances(self):
"""retrieve LDS ordinances
if self.living:
return res, famc
url = "/service/tree/tree-data/reservations/person/%s/ordinances" % self.fid
- data = self.tree.fs.get_url(url, {})
+ data = self.tree.fs.get_url(url, {}, no_api=True)
if data:
for key, o in data["data"].items():
if key == "baptism":
if n.text == text:
self.notes.add(n)
return
- self.notes.add(Note(text, self.tree, num_prefix=f'INDI_{self.fid}_CONTRIB', note_type='Contribution Note'))
+ self.notes.add(
+ Note(
+ text,
+ self.tree,
+ num_prefix=f"INDI_{self.fid}_CONTRIB",
+ note_type="Contribution Note",
+ )
+ )
@property
def id(self):
return self.fid or self.num
-
@property
def handle(self):
if not self._handle:
- self._handle = '_' + os.urandom(10).hex()
+ self._handle = "_" + os.urandom(10).hex()
return self._handle
def printxml(self, parent_element):
-
# <person handle="_fa593c2779e5ed1c947416cba9e" change="1720382301" id="IL43B-D2H">
# <gender>M</gender>
# <name type="Birth Name">
# <parentin hlink="_fa593c277af72c83e0e3fbf6ed2"/>
# <citationref hlink="_fa593c277b7715371c26d1b0a81"/>
# </person>
- person = ET.SubElement(parent_element,
- 'person',
- handle=self.handle,
- # change='1720382301',
- id='I' + str(self.id))
+ person = ET.SubElement(
+ parent_element,
+ "person",
+ handle=self.handle,
+ # change='1720382301',
+ id="I" + str(self.id),
+ )
if self.fid:
- ET.SubElement(person, 'attribute', type='_FSFTID', value=self.fid)
+ ET.SubElement(person, "attribute", type="_FSFTID", value=self.fid)
if self.name:
self.name.printxml(person)
for name in self.nicknames | self.birthnames | self.aka | self.married:
name.printxml(person)
-
- gender = ET.SubElement(person, 'gender')
+
+ gender = ET.SubElement(person, "gender")
gender.text = self.gender
-
+
if self.fams:
for fam in self.fams:
- ET.SubElement(person, 'parentin', hlink=fam.handle)
+ ET.SubElement(person, "parentin", hlink=fam.handle)
if self.famc:
for fam in self.famc:
- ET.SubElement(person, 'childof', hlink=fam.handle)
+ ET.SubElement(person, "childof", hlink=fam.handle)
+ ET.SubElement(person, "attribute", type="_FSFTID", value=self.fid)
- ET.SubElement(person, 'attribute', type="_FSFTID", value=self.fid)
-
-
for fact in self.facts:
- ET.SubElement(person, 'eventref', hlink=fact.handle, role='Primary')
+ ET.SubElement(person, "eventref", hlink=fact.handle, role="Primary")
for citation in self.citations:
- ET.SubElement(person, 'citationref', hlink=citation.handle)
+ ET.SubElement(person, "citationref", hlink=citation.handle)
for note in self.notes:
- ET.SubElement(person, 'noteref', hlink=note.handle)
+ ET.SubElement(person, "noteref", hlink=note.handle)
# <noteref hlink="_fac4a686369713d9cd55159ada9"/>
# <citationref hlink="_fac4a72a01b1681293ea1ee8d9"/>
-
def print(self, file=sys.stdout):
"""print individual in GEDCOM format"""
file.write("0 @I%s@ INDI\n" % self.id)
# for num in self.fams_ids:
# print(f'Famc Ids: {self.famc_ids}', file=sys.stderr)
# for num in self.famc_ids:
- # file.write("1 FAMC @F%s@\n" % num)
+ # file.write("1 FAMC @F%s@\n" % num)
file.write("1 _FSFTID %s\n" % self.fid)
for o in self.notes:
o.link(file)
counter = 0
- def __init__(self, husband: Indi | None, wife: Indi | None, tree: 'Tree'):
+ def __init__(
+ self,
+ husband: Indi | None = None,
+ wife: Indi | None = None,
+ tree: "Tree" = None,
+ num=None,
+ ):
self._handle = None
- self.num = Fam.gen_id(husband, wife)
+ self.num = num if num else Fam.gen_id(husband, wife)
self.fid = None
self.husband = husband
self.wife = wife
self.children: Set[Indi] = set()
self.facts: Set[Fact] = set()
self.sealing_spouse = None
+ self.husb_num = None
+ self.wife_num = None
+ self.chil_num = set()
+ self.husb_fid = None
+ self.wife_fid = None
+ self.chil_fid = set()
self.notes = set()
self.sources = set()
@property
def handle(self):
if not self._handle:
- self._handle = '_' + os.urandom(10).hex()
+ self._handle = "_" + os.urandom(10).hex()
return self._handle
-
+
@staticmethod
def gen_id(husband: Indi | None, wife: Indi | None) -> str:
if husband and wife:
- return f'FAM_{husband.id}-{wife.id}'
+ return f"FAM_{husband.id}-{wife.id}"
elif husband:
- return f'FAM_{husband.id}-UNK'
+ return f"FAM_{husband.id}-UNK"
elif wife:
- return f'FAM_UNK-{wife.id}'
+ return f"FAM_UNK-{wife.id}"
else:
Fam.counter += 1
- return f'FAM_UNK-UNK-{Fam.counter}'
+ return f"FAM_UNK-UNK-{Fam.counter}"
def add_child(self, child: Indi | None):
"""add a child fid to the family"""
if data:
if "facts" in data["relationships"][0]:
for x in data["relationships"][0]["facts"]:
- self.facts.add(Fact(x, self.tree, num_prefix=f'FAM_{self.fid}'))
+ self.facts.add(Fact(x, self.tree, num_prefix=f"FAM_{self.fid}"))
if "sources" in data["relationships"][0]:
quotes = dict()
for x in data["relationships"][0]["sources"]:
for n in notes["relationships"][0]["notes"]:
text_note = "=== %s ===\n" % n["subject"] if "subject" in n else ""
text_note += n["text"] + "\n" if "text" in n else ""
- self.notes.add(Note(text_note, self.tree, num_prefix=f'FAM_{self.fid}', note_type='Marriage Note'))
+ self.notes.add(
+ Note(
+ text_note,
+ self.tree,
+ num_prefix=f"FAM_{self.fid}",
+ note_type="Marriage Note",
+ )
+ )
def get_contributors(self):
"""retrieve contributors"""
if n.text == text:
self.notes.add(n)
return
- self.notes.add(Note(text, self.tree, num_prefix=f'FAM_{self.fid}_CONTRIB', note_type='Contribution Note'))
+ self.notes.add(
+ Note(
+ text,
+ self.tree,
+ num_prefix=f"FAM_{self.fid}_CONTRIB",
+ note_type="Contribution Note",
+ )
+ )
@property
def id(self):
return self.num
-
+
def printxml(self, parent_element):
# <family handle="_fa593c277af212e6c1f9f44bc4a" change="1720382301" id="F9MKP-K92">
# <rel type="Unknown"/>
# <childref hlink="_fa593c279e1466787c923487b98"/>
# <attribute type="_FSFTID" value="9MKP-K92"/>
# </family>
- family = ET.SubElement(parent_element,
- 'family',
- handle=self.handle,
- # change='1720382301',
- id=self.id)
- ET.SubElement(family, 'rel', type='Unknown')
+ family = ET.SubElement(
+ parent_element,
+ "family",
+ handle=self.handle,
+ # change='1720382301',
+ id=self.id,
+ )
+ ET.SubElement(family, "rel", type="Unknown")
if self.husband:
- ET.SubElement(family, 'father', hlink=self.husband.handle)
+ ET.SubElement(family, "father", hlink=self.husband.handle)
if self.wife:
- ET.SubElement(family, 'mother', hlink=self.wife.handle)
+ ET.SubElement(family, "mother", hlink=self.wife.handle)
for child in self.children:
- ET.SubElement(family, 'childref', hlink=child.handle)
+ ET.SubElement(family, "childref", hlink=child.handle)
for fact in self.facts:
- ET.SubElement(family, 'eventref', hlink=fact.handle, role='Primary')
+ ET.SubElement(family, "eventref", hlink=fact.handle, role="Primary")
def print(self, file=sys.stdout):
"""print family information in GEDCOM format"""
:param fs: a Session object
"""
- def __init__(self, fs: Optional[requests.Session]=None, exclude: List[str]=None, geonames_key=None):
+ def __init__(
+ self,
+ fs: Optional[requests.Session] = None,
+ exclude: List[str] = None,
+ geonames_key=None,
+ ):
self.fs = fs
self.geonames_key = geonames_key
self.indi: Dict[str, Indi] = dict()
self.display_name = fs.display_name
self.lang = babelfish.Language.fromalpha2(fs.lang).name
- self.geosession = CachedSession('http_cache', backend='filesystem', expire_after=86400)
+ self.geosession = CachedSession(
+ "http_cache/requests",
+ backend="sqlite",
+ expire_after=86400,
+ allowable_codes=(200,),
+ backend_kwargs={"table_name": "requests"},
+ )
def add_indis(self, fids_in: List[str]):
"""add individuals to the family tree
if fid not in self.exclude:
fids.append(fid)
else:
- print(
- "Excluding %s from the family tree" % fid, file=sys.stderr
- )
+ print("Excluding %s from the family tree" % fid, file=sys.stderr)
async def add_datas(loop, data):
futures = set()
if source_data["id"] not in self.sources:
self.sources[source_data["id"]] = Source(source_data, self)
return self.sources.get(source_data["id"])
-
+
def ensure_citation(self, data: Dict[str, Any], source: Source) -> Citation:
citation_id = data["id"]
if citation_id not in self.citations:
self.citations[citation_id] = Citation(data, source)
return self.citations[citation_id]
- def ensure_family(self, father: Optional['Indi'], mother: Optional['Indi']) -> Fam:
+ def ensure_family(self, father: Optional["Indi"], mother: Optional["Indi"]) -> Fam:
fam_id = Fam.gen_id(father, mother)
if fam_id not in self.fam:
self.fam[fam_id] = Fam(father, mother, self)
return self.fam[fam_id]
-
def place_by_geoname_id(self, id: str) -> Optional[Place]:
for place in self.places:
if place.id == id:
return None
def get_by_geonames_id(self, geonames_id: str) -> Place:
- print('Fetching place hierarchy for', geonames_id, file=sys.stderr)
+ print("Fetching place hierarchy for", geonames_id, file=sys.stderr)
hierarchy = geocoder.geonames(
geonames_id,
key=self.geonames_key,
- lang=['hu', 'en', 'de'],
- method='hierarchy',
+ lang=["hu", "en", "de"],
+ method="hierarchy",
session=self.geosession,
)
if hierarchy and hierarchy.ok:
last_place = None
- for item in hierarchy.geojson.get('features', []):
- properties = item.get('properties', {})
- code = properties.get('code')
-
- if code in ['AREA', 'CONT']:
+ for item in hierarchy.geojson.get("features", []):
+ properties = item.get("properties", {})
+ code = properties.get("code")
+
+ if code in ["AREA", "CONT"]:
continue
-
- print('Properties', properties, file=sys.stderr)
- id = 'GEO' + str(properties['geonames_id'])
+
+ print("Properties", properties, file=sys.stderr)
+ id = "GEO" + str(properties["geonames_id"])
place = self.place_by_geoname_id(id)
if place is None:
place = Place(
id,
- properties.get('address'),
- GEONAME_FEATURE_MAP.get(code, 'Unknown'),
+ properties.get("address"),
+ GEONAME_FEATURE_MAP.get(code, "Unknown"),
last_place,
- properties.get('lat'),
- properties.get('lng')
+ properties.get("lat"),
+ properties.get("lng"),
)
self.places.append(place)
last_place = place
return last_place
- @property
+ @property
def _next_place_counter(self):
self.place_counter += 1
return self.place_counter
-
- def ensure_place(self, place_name: str, fid: Optional[str] = None, coord: Optional[Tuple[float, float]] = None) -> Place:
+ def ensure_place(
+ self,
+ place_name: str,
+ fid: Optional[str] = None,
+ coord: Optional[Tuple[float, float]] = None,
+ ) -> Place:
if place_name not in self.places_by_names:
place = None
if self.geonames_key:
- print('Fetching place', place_name, file=sys.stderr)
+ print("Fetching place", place_name, file=sys.stderr)
geoname_record = geocoder.geonames(
place_name,
key=self.geonames_key,
if place is None:
coord = self.place_cache.get(fid) if coord is None else coord
place = Place(
- 'PFSID' + fid if fid is not None else 'P' + str(self._next_place_counter),
+ (
+ "PFSID" + fid
+ if fid is not None
+ else "P" + str(self._next_place_counter)
+ ),
place_name,
latitude=coord[0] if coord is not None else None,
- longitude=coord[1] if coord is not None else None
+ longitude=coord[1] if coord is not None else None,
)
self.places.append(place)
self.places_by_names[place_name] = place
if child is not None:
fam.add_child(child)
child.add_famc(fam)
-
+
if father is not None:
father.add_fams(fam)
if mother is not None:
and father in self.indi
):
self.add_trio(
- self.indi.get(father),
- self.indi.get(mother),
+ self.indi.get(father),
+ self.indi.get(mother),
self.indi.get(fid),
)
return set(filter(None, parents))
# )
# self.indi[fid].fams_num = set(
# self.fam[(husb, wife)].num for husb, wife in self.indi[fid].fams_fid
- # )
+ # )
# self.indi[fid].famc_ids = set(
# self.fam[(husb, wife)].id for husb, wife in self.indi[fid].famc_fid
# )
# )
def printxml(self, file: BinaryIO):
-
-# root = ET.Element("root")
-# doc = ET.SubElement(root, "doc")
-
-# ET.SubElement(doc, "field1", name="blah").text = "some value1"
-# ET.SubElement(doc, "field2", name="asdfasd").text = "some vlaue2"
-
-# tree = ET.ElementTree(root)
-# tree.write("filename.xml")
-
-# <?xml version="1.0" encoding="UTF-8"?>
-# <!DOCTYPE database PUBLIC "-//Gramps//DTD Gramps XML 1.7.1//EN"
-# "http://gramps-project.org/xml/1.7.1/grampsxml.dtd">
-# <database xmlns="http://gramps-project.org/xml/1.7.1/">
-# <header
-# <created date="2024-07-07" version="5.2.2"/>
-# <researcher>
-# <resname>Barnabás Südy</resname>
-# </researcher>
-# </header>
+ # root = ET.Element("root")
+ # doc = ET.SubElement(root, "doc")
+
+ # ET.SubElement(doc, "field1", name="blah").text = "some value1"
+ # ET.SubElement(doc, "field2", name="asdfasd").text = "some vlaue2"
+
+ # tree = ET.ElementTree(root)
+ # tree.write("filename.xml")
+
+ # <?xml version="1.0" encoding="UTF-8"?>
+ # <!DOCTYPE database PUBLIC "-//Gramps//DTD Gramps XML 1.7.1//EN"
+ # "http://gramps-project.org/xml/1.7.1/grampsxml.dtd">
+ # <database xmlns="http://gramps-project.org/xml/1.7.1/">
+ # <header
+ # <created date="2024-07-07" version="5.2.2"/>
+ # <researcher>
+ # <resname>Barnabás Südy</resname>
+ # </researcher>
+ # </header>
root = ET.Element("database", xmlns="http://gramps-project.org/xml/1.7.1/")
header = ET.SubElement(root, "header")
- ET.SubElement(header, "created", date=datetime.strftime(datetime.now(), "%Y-%m-%d"), version="5.2.2")
+ ET.SubElement(
+ header,
+ "created",
+ date=datetime.strftime(datetime.now(), "%Y-%m-%d"),
+ version="5.2.2",
+ )
researcher = ET.SubElement(header, "researcher")
resname = ET.SubElement(researcher, "resname")
resname.text = self.display_name
tree = ET.ElementTree(root)
- doctype='<!DOCTYPE database PUBLIC "-//Gramps//DTD Gramps XML 1.7.1//EN" "http://gramps-project.org/xml/1.7.1/grampsxml.dtd">'
- file.write(doctype.encode('utf-8'))
- tree.write(file, 'utf-8')
-
+ doctype = '<!DOCTYPE database PUBLIC "-//Gramps//DTD Gramps XML 1.7.1//EN" "http://gramps-project.org/xml/1.7.1/grampsxml.dtd">'
+ file.write(doctype.encode("utf-8"))
+ tree.write(file, "utf-8")
def print(self, file=sys.stdout):
"""print family tree in GEDCOM format"""
file.write("2 VERS 5.5.1\n")
file.write("2 FORM LINEAGE-LINKED\n")
file.write("1 SOUR getmyancestors\n")
- file.write("2 VERS %s\n" % getmyancestors.__version__)
+ file.write("2 VERS %s\n" % __version__)
file.write("2 NAME getmyancestors\n")
file.write("1 DATE %s\n" % time.strftime("%d %b %Y"))
file.write("2 TIME %s\n" % time.strftime("%H:%M:%S"))
# global imports
import os
import sys
-from tkinter import (
- Tk,
- PhotoImage,
-)
+from tkinter import PhotoImage, Tk
# local imports
-from getmyancestors.classes.gui import (
- FStoGEDCOM,
-)
+from getmyancestors.classes.gui import FStoGEDCOM
def main():
# global imports
from __future__ import print_function
+
+import argparse
+import asyncio
+import getpass
+import os
import re
import sys
import time
-from urllib.parse import unquote
-import getpass
-import asyncio
-import argparse
-# local imports
+from getmyancestors.classes.session import CachedSession, Session
from getmyancestors.classes.tree import Tree
-from getmyancestors.classes.session import Session
-from getmyancestors.classes.session import CachedSession
def main():
+ # Forces stdout to use UTF-8 or at least not crash on unknown characters
+ if hasattr(sys.stdout, "reconfigure"):
+ try:
+ sys.stdout.reconfigure(encoding="utf-8", errors="replace")
+ except Exception:
+ pass
+
parser = argparse.ArgumentParser(
- description="Retrieve GEDCOM data from FamilySearch Tree (4 Jul 2016)",
+ description="Retrieve GEDCOM data from FamilySearch Tree",
add_help=False,
usage="getmyancestors -u username -p password [options]",
)
help="Number of generations to descend [0]",
)
parser.add_argument(
- '--distance',
+ "--distance",
metavar="<INT>",
type=int,
default=0,
help="The maxium distance from the starting individuals [0]. If distance is set, ascend and descend will be ignored.",
)
parser.add_argument(
- '--only-blood-relatives',
+ "--only-blood-relatives",
action="store_true",
default=True,
help="Only include blood relatives in the tree [False]",
help="Add spouses and couples information [False]",
)
parser.add_argument(
- "--cache",
- action="store_true",
- default=False,
- help="Use of http cache to reduce requests during testing [False]",
+ "--no-cache",
+ dest="cache",
+ action="store_false",
+ default=True,
+ help="Disable http cache [True]",
+ )
+ parser.add_argument(
+ "--no-cache-control",
+ dest="cache_control",
+ action="store_false",
+ default=True,
+ help="Disable cache-control (use dumb cache) [True]",
)
parser.add_argument(
"-r",
default=60,
help="Timeout in seconds [60]",
)
-
parser.add_argument(
"-x",
"--xml",
type=str,
help="Geonames.org username in order to download place data",
)
+ parser.add_argument(
+ "--client_id", metavar="<STR>", type=str, help="Use Specific Client ID"
+ )
+ parser.add_argument(
+ "--redirect_uri", metavar="<STR>", type=str, help="Use Specific Redirect Uri"
+ )
try:
parser.add_argument(
"-o",
"--outfile",
metavar="<FILE>",
- # type=argparse.FileType("w", encoding="UTF-8"),
- # default=sys.stdout,
help="output GEDCOM file [stdout]",
)
parser.add_argument(
except SystemExit:
parser.print_help(file=sys.stderr)
sys.exit(2)
+
if args.individuals:
for fid in args.individuals:
if not re.match(r"[A-Z0-9]{4}-[A-Z0-9]{3}", fid):
if not re.match(r"[A-Z0-9]{4}-[A-Z0-9]{3}", fid):
sys.exit("Invalid FamilySearch ID: " + fid)
- args.username = (
- args.username if args.username else input("Enter FamilySearch username: ")
- )
- args.password = (
- args.password
- if args.password
- else getpass.getpass("Enter FamilySearch password: ")
- )
+ if not args.username:
+ if args.verbose:
+ print("⚠️ Warning: getting username from command line, env var not set.")
+ args.username = input("Enter FamilySearch username: ")
+ if not args.password:
+ if os.getenv("FAMILYSEARCH_PASS"):
+ if args.verbose:
+ print("✅ Using password from env var.")
+ args.password = os.getenv("FAMILYSEARCH_PASS")
+ else:
+ if args.verbose:
+ print("⚠️ Warning: getting password from command line, env var not set.")
+ args.password = getpass.getpass("Enter FamilySearch password: ")
+
+ if args.verbose:
+ print("✅ Using username: " + args.username)
+ print(f"✅ Using password: {len(args.password)} digits long.")
time_count = time.time()
# Report settings used when getmyancestors is executed
- if args.save_settings and args.outfile.name != "<stdout>":
+ if args.save_settings and args.outfile and args.outfile != "<stdout>":
def parse_action(act):
if not args.show_password and act.dest == "password":
value = getattr(args, act.dest)
return str(getattr(value, "name", value))
- formatting = "{:74}{:\t>1}\n"
- settings_name = args.outfile.name.split(".")[0] + ".settings"
+ formatting = "{:74}{:\\t>1}\\n"
+ settings_name = args.outfile.rsplit(".", 1)[0] + ".settings"
try:
- with open(settings_name, "w") as settings_file:
+ with open(settings_name, "w", encoding="utf-8") as settings_file:
settings_file.write(
formatting.format("time stamp: ", time.strftime("%X %x %Z"))
)
# initialize a FamilySearch session and a family tree object
print("Login to FamilySearch...", file=sys.stderr)
+
+ # Common params
+ session_kwargs = {
+ "username": args.username,
+ "password": args.password,
+ "client_id": args.client_id,
+ "redirect_uri": args.redirect_uri,
+ "verbose": args.verbose,
+ "logfile": args.logfile,
+ "timeout": args.timeout,
+ "cache_control": args.cache_control,
+ }
+
if args.cache:
print("Using cache...", file=sys.stderr)
- fs = CachedSession(args.username, args.password, args.verbose, args.logfile, args.timeout)
+ fs = CachedSession(**session_kwargs)
else:
- fs = Session(args.username, args.password, args.verbose, args.logfile, args.timeout)
+ fs = Session(**session_kwargs)
+
if not fs.logged:
sys.exit(2)
_ = fs._
tree = Tree(
- fs,
+ fs,
exclude=args.exclude,
geonames_key=args.geonames,
)
test = fs.get_url(
"/service/tree/tree-data/reservations/person/%s/ordinances" % fs.fid, {}
)
- if test["status"] != "OK":
+ if not test or test.get("status") != "OK":
sys.exit(2)
try:
print(_("Downloading starting individuals..."), file=sys.stderr)
tree.add_indis(todo)
-
-
# download ancestors
if args.distance == 0:
todo = set(tree.indi.keys())
# download spouses
if args.marriage:
- print(_("Downloading spouses and marriage information..."), file=sys.stderr)
+ print(
+ _("Downloading spouses and marriage information..."),
+ file=sys.stderr,
+ )
todo = set(tree.indi.keys())
tree.add_spouses(todo)
todo_others = set()
done = set()
for distance in range(args.distance):
-
if not todo_bloodline and not todo_others:
break
done |= todo_bloodline
parents = tree.add_parents(todo_bloodline) - done
children = tree.add_children(todo_bloodline) - done
- # download spouses
if args.marriage:
- print(_("Downloading spouses and marriage information..."), file=sys.stderr)
+ print(
+ _("Downloading spouses and marriage information..."),
+ file=sys.stderr,
+ )
todo = set(tree.indi.keys())
tree.add_spouses(todo)
- # spouses = tree.add_spouses(todo_bloodline) - done
-
todo_bloodline = parents | children
- # if args.only_blood_relatives:
- # # Downloading non bloodline parents
- # tree.add_parents(todo_others)
-
- # # TODO what is a non bloodline person becomes bloodline on another branch?
- # todo_others = spouses
- # else:
- # todo_bloodline |= spouses
# download ordinances, notes and contributors
async def download_stuff(loop):
for future in futures:
await future
- loop = asyncio.get_event_loop()
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
print(
_("Downloading notes")
+ (
loop.run_until_complete(download_stuff(loop))
finally:
- # compute number for family relationships and print GEDCOM file
tree.reset_num()
if args.xml:
- with open(args.outfile, "wb") as f:
- tree.printxml(f)
+ if args.outfile:
+ with open(args.outfile, "wb") as f:
+ tree.printxml(f)
+ else:
+ tree.printxml(sys.stdout.buffer)
else:
- with open(args.outfile, "w", encoding="UTF-8") as f:
- tree.print(f)
+ if args.outfile:
+ with open(args.outfile, "w", encoding="UTF-8") as f:
+ tree.print(f)
+ else:
+ tree.print(sys.stdout)
+
+ # Statistics printout (abbreviated for brevity)
print(
_(
"Downloaded %s individuals, %s families, %s sources and %s notes "
from __future__ import print_function
-# global imports
+import argparse
import os
import sys
-import argparse
-# local imports
-from getmyancestors.classes.tree import Indi, Fam, Tree
from getmyancestors.classes.gedcom import Gedcom
+from getmyancestors.classes.tree import Fam, Indi, Tree
sys.path.append(os.path.dirname(sys.argv[0]))
--- /dev/null
+import os
+import sys
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+# Ensure we can import the module from the root directory
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+
+from getmyancestors.classes.session import Session
+
+
+@pytest.fixture
+def mock_session():
+ """
+ Creates a Session object where the network layer is mocked out.
+ """
+ with patch("getmyancestors.classes.session.Session.login"):
+ session = Session("test_user", "test_pass", verbose=False)
+
+ # Mock cookies
+ session.cookies = {"fssessionid": "mock_session_id", "XSRF-TOKEN": "mock_token"}
+
+ # Mock session attributes required by Tree
+ session.lang = "en"
+ session.fid = "KW7V-Y32"
+
+ # Mock the network methods
+ session.get = MagicMock()
+ session.post = MagicMock()
+ session.get_url = MagicMock()
+
+ # Mock the translation method
+ session._ = lambda s: s
+
+ yield session
+
+
+@pytest.fixture
+def sample_person_json():
+ return {
+ "persons": [
+ {
+ "id": "KW7V-Y32",
+ "living": False,
+ "display": {
+ "name": "John Doe",
+ "gender": "Male",
+ "lifespan": "1900-1980",
+ },
+ "facts": [
+ {
+ "type": "http://gedcomx.org/Birth",
+ "date": {"original": "1 Jan 1900"},
+ "place": {"original": "New York"},
+ "attribution": {"changeMessage": "Initial import"},
+ }
+ ],
+ "names": [
+ {
+ "nameForms": [{"fullText": "John Doe"}],
+ "preferred": True,
+ "type": "http://gedcomx.org/BirthName",
+ "attribution": {"changeMessage": "Initial import"},
+ }
+ ],
+ "attribution": {"changeMessage": "Initial import"},
+ }
+ ]
+ }
+
+
+@pytest.fixture
+def mock_user_data():
+ return {
+ "users": [
+ {
+ "personId": "KW7V-Y32",
+ "preferredLanguage": "en",
+ "displayName": "Test User",
+ }
+ ]
+ }
--- /dev/null
+import sys
+from unittest.mock import patch
+
+import pytest
+
+from getmyancestors.getmyanc import main
+
+
+class TestCLI:
+
+ @patch("getmyancestors.getmyanc.Session")
+ @patch("getmyancestors.getmyanc.CachedSession")
+ @patch("getmyancestors.getmyanc.Tree")
+ def test_basic_args(self, MockTree, MockCachedSession, MockSession):
+ """Test that arguments are parsed and passed to classes correctly"""
+
+ # Mock sys.argv to simulate command line execution
+ test_args = [
+ "getmyancestors",
+ "-u",
+ "myuser",
+ "-p",
+ "mypass",
+ "-i",
+ "KW7V-Y32",
+ "--verbose",
+ ]
+
+ # Setup the session to appear logged in
+ MockCachedSession.return_value.logged = True
+
+ with patch.object(sys, "argv", test_args):
+ main()
+
+ # Verify Session was initialized with CLI args
+ MockCachedSession.assert_called_once()
+ args, kwargs = MockCachedSession.call_args
+ assert kwargs["username"] == "myuser"
+ assert kwargs["password"] == "mypass"
+ assert kwargs["verbose"] is True
+ assert kwargs["cache_control"] is True
+
+ # Verify Tree started
+ MockTree.return_value.add_indis.assert_called_with(["KW7V-Y32"])
+
+ def test_arg_validation(self):
+ """Test that invalid ID formats cause an exit"""
+ test_args = ["getmyancestors", "-u", "u", "-p", "p", "-i", "BAD_ID"]
+
+ with patch.object(sys, "argv", test_args):
+ with pytest.raises(SystemExit):
+ # This should trigger sys.exit("Invalid FamilySearch ID...")
+ main()
--- /dev/null
+import io
+import unittest
+
+from getmyancestors.classes.gedcom import Gedcom
+from getmyancestors.classes.tree import Fact, Indi, Name, Tree
+
+SAMPLE_GEDCOM = """0 HEAD
+1 CHAR UTF-8
+1 GEDC
+2 VERS 5.5.1
+2 FORM LINEAGE-LINKED
+0 @I1@ INDI
+1 NAME John /Doe/
+2 GIVN John
+2 SURN Doe
+1 SEX M
+1 BIRT
+2 DATE 1 JAN 1980
+2 PLAC Springfield
+1 FAMC @F1@
+1 _FSFTID KW7V-Y32
+0 @I2@ INDI
+1 NAME Jane /Smith/
+1 SEX F
+1 FAMS @F1@
+1 _FSFTID KW7V-Y33
+0 @F1@ FAM
+1 HUSB @I1@
+1 WIFE @I2@
+1 CHIL @I3@
+1 _FSFTID F123-456
+0 @I3@ INDI
+1 NAME Baby /Doe/
+1 SEX M
+1 FAMC @F1@
+1 _FSFTID KW7V-Y34
+0 TRLR
+"""
+
+
+class TestGedcomLogic(unittest.TestCase):
+ def test_parse_gedcom(self):
+ """Test parsing of a GEDCOM string using the Gedcom class."""
+ f = io.StringIO(SAMPLE_GEDCOM)
+ tree = Tree()
+
+ # The Gedcom class takes a file-like object and a tree
+ ged = Gedcom(f, tree)
+
+ # Verify Individuals
+ # The parser seems to use the number from @I{num}@ as the key in ged.indi
+ self.assertIn(1, ged.indi)
+ self.assertIn(2, ged.indi)
+ self.assertIn(3, ged.indi)
+
+ john = ged.indi[1]
+ self.assertEqual(john.gender, "M")
+ self.assertEqual(john.fid, "KW7V-Y32")
+
+ # Check Name - The parsing logic for names is a bit complex in __get_name
+ # It populates birthnames by default if no type is specified
+ # BUT the first name found is assigned to self.name, NOT birthnames
+ self.assertIsNotNone(john.name)
+ self.assertEqual(john.name.given, "John")
+ self.assertEqual(john.name.surname, "Doe")
+
+ # Verify birthnames if any additional names present (none in this sample)
+ # self.assertTrue(len(john.birthnames) > 0)
+
+ # Verify Family
+ self.assertIn(1, ged.fam)
+ fam = ged.fam[1]
+ self.assertEqual(fam.husb_num, 1) # Points to I1
+ self.assertEqual(fam.wife_num, 2) # Points to I2
+ self.assertIn(3, fam.chil_num) # Points to I3
+ self.assertEqual(fam.fid, "F123-456")
+
+ def test_tree_export(self):
+ """Test that a Tree object can be exported to GEDCOM format."""
+ tree = Tree()
+ tree.display_name = "Test User"
+ tree.lang = "en"
+
+ # Create Individual
+ indi = Indi("KW7V-Y32", tree, num=1)
+ indi.gender = "M"
+
+ name = Name()
+ name.given = "John"
+ name.surname = "Doe"
+ name.full = (
+ "John Doe" # Some print methods use .full if available or construct it
+ )
+ indi.birthnames.add(name)
+
+ fact = Fact()
+ fact.tag = "BIRT"
+ fact.type = "http://gedcomx.org/Birth"
+ fact.date = "1 JAN 1980"
+ fact.place = tree.ensure_place("Springfield")
+ indi.facts.add(fact)
+
+ tree.indi["KW7V-Y32"] = indi
+
+ # Validate output
+ output = io.StringIO()
+ tree.print(output)
+ content = output.getvalue()
+
+ self.assertIn("0 HEAD", content)
+ self.assertIn("1 NAME John /Doe/", content)
+ # ID is derived from fid if present
+ self.assertIn("0 @IKW7V-Y32@ INDI", content)
+ self.assertIn("1 SEX M", content)
+ self.assertIn("1 BIRT", content)
+ self.assertIn("2 DATE 1 JAN 1980", content)
+ self.assertIn("0 TRLR", content)
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+import os
+import sys
+import unittest
+from unittest.mock import MagicMock, PropertyMock, patch
+
+# Adjust path to allow imports from root of the repository
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
+
+from getmyancestors import getmyanc as getmyancestors
+
+
+class TestFullIntegration(unittest.TestCase):
+ @patch("webbrowser.open")
+ @patch("getmyancestors.classes.session.GMASession.login", autospec=True)
+ @patch(
+ "getmyancestors.classes.session.GMASession.logged", new_callable=PropertyMock
+ )
+ @patch("requests.Session.get")
+ @patch("requests.Session.post")
+ def test_main_execution(
+ self,
+ mock_post,
+ mock_get,
+ mock_logged,
+ mock_login,
+ mock_browser,
+ ):
+ """
+ Integration test for the main execution flow.
+ Bypasses login logic and mocks network responses with static data.
+ """
+
+ # Setup mocks
+ mock_logged.return_value = True
+
+ # Define a fake login that calls set_current to populate session data
+ def fake_login(self):
+ # Calling self.set_current() will trigger self.get_url() -> self.get()
+ self.set_current()
+
+ mock_login.side_effect = fake_login
+ mock_logged.return_value = True
+
+ # Setup generic response for any GET request
+ # users/current -> sets lang='en'
+ generic_json = {
+ "users": [
+ {
+ "personId": "TEST-123",
+ "preferredLanguage": "en",
+ "displayName": "Integrator",
+ }
+ ],
+ "persons": [
+ {
+ "id": "TEST-123",
+ "living": True,
+ "names": [
+ {
+ "preferred": True,
+ "type": "http://gedcomx.org/BirthName",
+ "nameForms": [
+ {
+ "fullText": "Test Person",
+ "parts": [
+ {
+ "type": "http://gedcomx.org/Given",
+ "value": "Test",
+ },
+ {
+ "type": "http://gedcomx.org/Surname",
+ "value": "Person",
+ },
+ ],
+ }
+ ],
+ "attribution": {"changeMessage": "Automated update"},
+ }
+ ],
+ "notes": [], # Added notes list for get_notes()
+ "facts": [],
+ "display": {
+ "name": "Test Person",
+ "gender": "Male",
+ "lifespan": "1900-2000",
+ },
+ }
+ ],
+ "childAndParentsRelationships": [],
+ "parentAndChildRelationships": [],
+ }
+
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = generic_json
+ mock_response.headers = {}
+
+ # When Session.get is called, it returns our mock response
+ def side_effect_get(url, *args, **kwargs):
+ print(f"DEBUG: Mock GET called for {url}")
+ return mock_response
+
+ mock_get.side_effect = side_effect_get
+ mock_post.return_value = mock_response
+
+ # Output file path in .tmp directory
+ output_file = os.path.abspath(".tmp/test_output.ged")
+ settings_file = os.path.abspath(".tmp/test_output.settings")
+
+ # Prepare arguments mimicking CLI usage
+ test_args = [
+ "getmyancestors",
+ "-u",
+ "testuser",
+ "-p",
+ "testpass",
+ "--no-cache",
+ "--outfile",
+ output_file,
+ ]
+
+ with patch.object(sys, "argv", test_args):
+ try:
+ getmyancestors.main()
+ except SystemExit as e:
+ # If it exits with 0 or None, it's a success
+ if e.code not in [None, 0]:
+ print(f"SystemExit: {e.code}")
+ self.fail(f"main() exited with code {e.code}")
+
+ # Basic assertions
+ self.assertTrue(mock_login.called, "Login should have been called")
+ self.assertTrue(mock_get.called, "Should have attempted network calls")
+
+ self.assertTrue(
+ os.path.exists(output_file),
+ f"Output file should have been created at {output_file}",
+ )
+
+ # Cleanup
+ if os.path.exists(output_file):
+ os.remove(output_file)
+ if os.path.exists(settings_file):
+ os.remove(settings_file)
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)
--- /dev/null
+from unittest.mock import MagicMock, patch
+
+from requests.exceptions import HTTPError
+
+from getmyancestors.classes.session import Session
+
+
+class TestSession:
+
+ @patch("getmyancestors.classes.session.webbrowser")
+ def test_login_success(self, mock_browser):
+ """Test the full OAuth2 login flow with successful token retrieval."""
+
+ with patch("getmyancestors.classes.session.GMASession.login"), patch(
+ "getmyancestors.classes.session.GMASession.load_cookies", return_value=False
+ ):
+ session = Session("user", "pass", verbose=True)
+
+ session.cookies = {"XSRF-TOKEN": "mock_xsrf_token"}
+ session.headers = {"User-Agent": "test"}
+
+ # Simulate the effect of a successful login
+ session.headers["Authorization"] = "Bearer fake_token"
+
+ # We can't easily test the internal loop of login() without a lot of complexity,
+ # so for now we'll just verify the expected state after "login".
+ # In a real environment, login() would do the network work.
+
+ assert session.headers.get("Authorization") == "Bearer fake_token"
+ mock_browser.open.assert_not_called()
+
+ def test_get_url_403_ordinances(self):
+ """Test handling of 403 Forbidden specifically for ordinances."""
+ with patch("getmyancestors.classes.session.GMASession.login"):
+ session = Session("u", "p")
+ session.lang = "en"
+
+ response_403 = MagicMock(status_code=403)
+ response_403.json.return_value = {
+ "errors": [{"message": "Unable to get ordinances."}]
+ }
+ response_403.raise_for_status.side_effect = HTTPError("403 Client Error")
+
+ session.get = MagicMock(return_value=response_403)
+ session._ = lambda x: x
+
+ result = session.get_url("/test-ordinances")
+ assert result == "error"
--- /dev/null
+import os
+import sys
+import unittest
+from unittest.mock import MagicMock, patch
+
+# Adjust path to allow imports
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
+
+from getmyancestors.classes.session import Session
+
+
+class TestSessionCaching(unittest.TestCase):
+ def setUp(self):
+ self.username = "testuser"
+ self.password = "testpass"
+
+ @patch("sqlite3.connect")
+ @patch("getmyancestors.classes.session.GMASession.login")
+ def test_save_cookies(self, mock_login, mock_connect):
+ # Mock database connection and cursor
+ mock_conn = MagicMock()
+ mock_cursor = MagicMock()
+ mock_connect.return_value = mock_conn
+ mock_conn.__enter__.return_value = mock_conn
+ mock_conn.cursor.return_value = mock_cursor
+ mock_conn.execute.return_value = mock_cursor
+
+ session = Session(self.username, self.password)
+ # Add a cookie to the session (simulating logged in state)
+ session.cookies.set(
+ "fssessionid", "mock-session-id", domain=".familysearch.org", path="/"
+ )
+ session.headers = {"Authorization": "Bearer mock-token"}
+
+ session.save_cookies()
+
+ # Check for REPLACE INTO session on the CONNECTION object
+ found_insert = False
+ for call in mock_conn.execute.call_args_list:
+ sql = call[0][0]
+ if "REPLACE INTO session" in sql:
+ params = call[0][1] # (json_string,)
+ if "mock-session-id" in params[0] and "Bearer mock-token" in params[0]:
+ found_insert = True
+ break
+
+ self.assertTrue(
+ found_insert,
+ "Expected REPLACE INTO session query with JSON data not found",
+ )
+
+ @patch("sqlite3.connect")
+ @patch("getmyancestors.classes.session.GMASession.login")
+ def test_load_cookies(self, mock_login, mock_connect):
+ # Mock database connection and cursor
+ mock_conn = MagicMock()
+ mock_cursor = MagicMock()
+ mock_connect.return_value = mock_conn
+ mock_conn.__enter__.return_value = mock_conn
+ mock_conn.cursor.return_value = mock_cursor
+ mock_conn.execute.return_value = mock_cursor
+
+ # Setup mock return data: JSON blob in 'value' column
+ import json
+
+ cookie_data = {
+ "cookies": {"fssessionid": "cached-session-id"},
+ "auth": "Bearer cached-token",
+ }
+ mock_cursor.fetchone.return_value = (json.dumps(cookie_data),)
+
+ session = Session(self.username, self.password)
+ session.load_cookies()
+
+ # Verify cookie jar is populated
+ self.assertEqual(session.cookies.get("fssessionid"), "cached-session-id")
+ self.assertEqual(session.headers.get("Authorization"), "Bearer cached-token")
+
+ @patch("getmyancestors.classes.session.GMASession.set_current", autospec=True)
+ @patch("getmyancestors.classes.session.GMASession.load_cookies")
+ @patch("sqlite3.connect")
+ @patch("requests.Session.get")
+ @patch("requests.Session.post")
+ def test_login_reuse_valid_session(
+ self, mock_post, mock_get, mock_connect, mock_load, mock_set_current
+ ):
+ # 1. Setup load_cookies to return True (session exists)
+ mock_load.return_value = True
+
+ # 2. Setup set_current to simulate success (sets fid)
+ # Using autospec=True allows the mock to receive 'self' as the first argument
+ def side_effect_set_current(self, auto_login=True):
+ self.fid = "USER-123"
+ self.cookies.set("fssessionid", "valid-id")
+
+ mock_set_current.side_effect = side_effect_set_current
+
+ # 3. Initialize session
+ session = Session(self.username, self.password)
+
+ # 4. Verify that the complex login flow was skipped (no POST requests made)
+ self.assertEqual(mock_post.call_count, 0)
+ self.assertEqual(session.fid, "USER-123")
+ self.assertTrue(session.logged)
+
+ @patch("builtins.input", return_value="mock_code")
+ @patch("getmyancestors.classes.session.GMASession.manual_login")
+ @patch("getmyancestors.classes.session.GMASession.set_current")
+ @patch("getmyancestors.classes.session.GMASession.load_cookies")
+ @patch("sqlite3.connect")
+ @patch("requests.Session.get")
+ @patch("requests.Session.post")
+ def test_login_fallback_on_invalid_session(
+ self,
+ mock_post,
+ mock_get,
+ mock_connect,
+ mock_load,
+ mock_set_current,
+ mock_manual,
+ mock_input,
+ ):
+ # 1. Setup load_cookies to return True (session exists)
+ mock_load.return_value = True
+
+ # 2. Setup set_current to simulate failure (doesn't set fid)
+ mock_set_current.return_value = None
+
+ # 3. Setup mock_get to throw exception to break the headless flow
+ # This exception is caught in login(), which then calls manual_login()
+ mock_get.side_effect = Exception("Headless login failed")
+
+ # 4. Initialize session - this triggers login() -> manual_login()
+ # manual_login is mocked, so it should not prompt.
+ Session(self.username, self.password)
+
+ # 5. Verify that set_current was called with auto_login=False (reuse attempt)
+ mock_set_current.assert_any_call(auto_login=False)
+
+ # 6. Verify that manual_login was called (fallback triggered)
+ self.assertTrue(mock_manual.called, "Fallback to manual_login should occur")
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+from unittest.mock import patch
+
+from getmyancestors.classes.tree import Fam, Indi, Tree
+
+
+class TestTree:
+
+ def test_add_indis(self, mock_session, sample_person_json):
+ """Test adding a list of individuals to the tree."""
+
+ def get_url_side_effect(url, headers=None):
+ if "KW7V-Y32" in url:
+ return sample_person_json
+ return {"persons": [], "childAndParentsRelationships": [], "spouses": []}
+
+ mock_session.get_url.side_effect = get_url_side_effect
+
+ tree = Tree(mock_session)
+ tree.add_indis(["KW7V-Y32"])
+
+ assert "KW7V-Y32" in tree.indi
+ person = tree.indi["KW7V-Y32"]
+ assert person.fid == "KW7V-Y32"
+
+ def test_add_parents(self, mock_session):
+ """Test fetching parents creates family links."""
+ tree = Tree(mock_session)
+ child_id = "KW7V-CHILD"
+ father_id = "KW7V-DAD"
+ mother_id = "KW7V-MOM"
+
+ # 1. Seed child with parent IDs
+ child = Indi(child_id, tree)
+ child.parents.add((father_id, mother_id))
+ tree.indi[child_id] = child
+
+ # 2. Mock parent relationship response (robustness)
+ relationships_response = {
+ "childAndParentsRelationships": [
+ {
+ "parent1": {"resourceId": father_id},
+ "parent2": {"resourceId": mother_id},
+ "child": {"resourceId": child_id},
+ }
+ ]
+ }
+ mock_session.get_url.return_value = relationships_response
+
+ # 3. Patch add_indis
+ # We must simulate the actual effect of add_indis: creating the objects
+ with patch.object(tree, "add_indis") as mock_add_indis:
+
+ def add_indis_side_effect(fids):
+ for fid in fids:
+ if fid not in tree.indi:
+ tree.indi[fid] = Indi(fid, tree)
+
+ mock_add_indis.side_effect = add_indis_side_effect
+
+ result = tree.add_parents({child_id})
+
+ # 4. Assertions
+ assert father_id in result
+ assert mother_id in result
+
+ # The key in tree.fam is 'FAM_<father_id>-<mother_id>'
+ fam_key = f"FAM_{father_id}-{mother_id}"
+ assert fam_key in tree.fam
+ assert tree.indi[child_id] in tree.fam[fam_key].children
+
+ def test_manual_family_linking(self, mock_session):
+ """
+ Verify that we can link individuals manually.
+ """
+ tree = Tree(mock_session)
+
+ husb = Indi("HUSB01", tree)
+ wife = Indi("WIFE01", tree)
+
+ fam = Fam(husb, wife, tree)
+ fam_key = fam.num # This is the key used in Tree.ensure_family or manual add
+ tree.fam[fam_key] = fam
+
+ # Link manually as GEDCOM parser or other tools might
+ husb.fams.add(fam)
+ wife.fams.add(fam)
+
+ assert fam.husband.id == "HUSB01"
+ assert tree.fam[fam_key] == fam
+++ /dev/null
-from getmyancestors import getmyancestors
-
-getmyancestors.main();
\ No newline at end of file
"diskcache==5.6.3",
"requests==2.32.3",
"fake-useragent==2.0.3",
+ "geocoder==1.38.1",
"requests-ratelimiter==0.7.0"
]
dynamic = ["version", "readme"]
getmyancestors = "getmyancestors.getmyancestors:main"
mergemyancestors = "getmyancestors.mergemyancestors:main"
fstogedcom = "getmyancestors.fstogedcom:main"
+
+# Linting configs
+
+[tool.isort]
+line_length = 88
+known_first_party = "getmyancestors"
+
+# See: https://copdips.com/2020/04/making-isort-compatible-with-black.html
+multi_line_output = 3
+include_trailing_comma = true
+
+[tool.ruff]
+line-length = 88
+target-version = "py39" # Lowest supported python version
+
+[tool.ruff.lint]
+# E/W = pycodestyle, F = Pyflakes
+# B = bugbear
+select = ["E", "F", "W", "B"]
+ignore = [
+ "E262", # inline comment should start with '# '
+ "E501", # Line too long
+]
+
+[tool.ruff.lint.per-file-ignores] # Temporary, hopefully
+"__init__.py" = ["F401"]
+"getmyancestors/classes/gedcom.py" = ["E203"]
+"getmyancestors/classes/tree.py" = ["E203"]
+"getmyancestors/classes/translation.py" = ["E501"]
+
+[tool.ruff.format]
+quote-style = "double"
+indent-style = "space"
+
+# Testing configs
+
+[tool.pytest]
+# See: https://docs.pytest.org/en/7.1.x/reference/customize.html
+testpaths = ["getmyancestors/tests"]
+
+[tool.coverage.run]
+# See: https://coverage.readthedocs.io/en/7.2.2/config.html#run
+command_line = "-m pytest -svv"
+source = ["getmyancestors"]
+
+[tool.coverage.report]
+fail_under = 53.00
+precision = 2
+
+show_missing = true
+skip_empty = true
+skip_covered = true
+
+omit = [
+ "getmyancestors/classes/gui.py", # not part of CLI tests (yet)
+ "**/tests/**" # do NOT show coverage tests... redundant
+]
+
+exclude_lines = ["pragma: no cover"]
babelfish==0.6.1
diskcache==5.6.3
+geocoder~=1.38.1
requests==2.32.3
-fake-useragent==2.0.3
+requests_cache==1.2.1
+fake-useragent==2.2.0
+setuptools==80.9.0
requests-ratelimiter==0.7.0
-setuptools==70.1.0