# global imports
import sys
import time
+from urllib.parse import urlparse, parse_qs
import requests
from fake_useragent import UserAgent
self.write_log("Downloading: " + url)
res = self.get(url, headers=self.headers)
res.raise_for_status()
+
+ url = f"https://ident.familysearch.org/cis-web/oauth2/v3/authorization?response_type=code&scope=openid profile email qualifies_for_affiliate_account country&client_id=a02j000000KTRjpAAH&redirect_uri=https://misbach.github.io/fs-auth/index_raw.html&username={self.username}"
+ self.write_log("Downloading: " + url)
+ response = self.get(url, allow_redirects=False, headers=self.headers)
+ location = response.headers["location"]
+ code = parse_qs(urlparse(location).query).get("code")
+ url = "https://ident.familysearch.org/cis-web/oauth2/v3/token"
+ self.write_log("Downloading: " + url)
+ res = self.post(
+ url,
+ data={
+ "grant_type": "authorization_code",
+ "client_id": "a02j000000KTRjpAAH",
+ "code": code,
+ "redirect_uri": "https://misbach.github.io/fs-auth/index_raw.html",
+ },
+ headers=self.headers,
+ )
+
+ try:
+ data = res.json()
+ except ValueError:
+ self.write_log("Invalid auth request")
+ continue
+
+ if "access_token" not in data:
+ self.write_log(res.text)
+ continue
+ access_token = data["access_token"]
+ self.headers.update({"Authorization": f"Bearer {access_token}"})
+
except requests.exceptions.ReadTimeout:
self.write_log("Read timed out")
continue
try:
self.write_log("Downloading: " + url)
r = self.get(
- "https://familysearch.org" + url,
+ "https://api.familysearch.org" + url,
timeout=self.timeout,
headers=headers,
)
:param fs: a Session object
"""
- def __init__(self, fs=None):
+ def __init__(self, fs=None, exclude=None):
self.fs = fs
self.indi = dict()
self.fam = dict()
self.sources = dict()
self.places = dict()
self.display_name = self.lang = None
+ self.exclude = exclude or []
if fs:
self.display_name = fs.display_name
self.lang = babelfish.Language.fromalpha2(fs.lang).name
- def add_indis(self, fids):
+ def add_indis(self, fids_in):
"""add individuals to the family tree
:param fids: an iterable of fid
"""
+ fids = []
+ for fid in fids_in:
+ if fid not in self.exclude:
+ fids.append(fid)
+ else:
+ print(
+ "Excluding %s from the family tree" % fid, file=sys.stderr
+ )
async def add_datas(loop, data):
futures = set()
type=str,
help="List of individual FamilySearch IDs for whom to retrieve ancestors",
)
+ parser.add_argument(
+ "-e",
+ "--exclude",
+ metavar="<STR>",
+ nargs="+",
+ type=str,
+ help="List of individual FamilySearch IDs to exclude from the tree",
+ )
parser.add_argument(
"-a",
"--ascend",
for fid in args.individuals:
if not re.match(r"[A-Z0-9]{4}-[A-Z0-9]{3}", fid):
sys.exit("Invalid FamilySearch ID: " + fid)
+ if args.exclude:
+ for fid in args.exclude:
+ if not re.match(r"[A-Z0-9]{4}-[A-Z0-9]{3}", fid):
+ sys.exit("Invalid FamilySearch ID: " + fid)
args.username = (
args.username if args.username else input("Enter FamilySearch username: ")
if not fs.logged:
sys.exit(2)
_ = fs._
- tree = Tree(fs)
+ tree = Tree(fs, exclude=args.exclude)
# check LDS account
if args.get_ordinances: