From f77d5e50f7a8bc54088678358f43af8eff2826b9 Mon Sep 17 00:00:00 2001 From: Shane Jaroch Date: Thu, 25 Dec 2025 22:13:53 -0500 Subject: [PATCH] fixup! more tidying up, linting, and formatting --- .github/workflows/testing.yaml | 2 +- Makefile | 10 +++ ffpass/__init__.py | 141 +++++++++++++++++++++------------ 3 files changed, 102 insertions(+), 51 deletions(-) diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml index 7b2bff4..d00081b 100644 --- a/.github/workflows/testing.yaml +++ b/.github/workflows/testing.yaml @@ -29,7 +29,7 @@ jobs: - name: Lint with flake8 run: | pip install flake8 - flake8 --ignore=E741,E501 . + flake8 . --exclude='*venv,build' --ignore=E741,E501 - name: Upload Unit Test Results if: always() diff --git a/Makefile b/Makefile index 75736b1..2390acd 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +SHELL:=/bin/bash + .PHONY: pypi pypi: dist twine upload dist/* @@ -11,6 +13,8 @@ dist: flake8 flake8: flake8 . --exclude '*venv,build' --count --select=E901,E999,F821,F822,F823 --show-source --statistics flake8 . --exclude '*venv,build' --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + # CI pipeline + flake8 . --exclude='*venv,build' --ignore=E741,E501 .PHONY: install @@ -27,3 +31,9 @@ test: .PHONY: clean clean: rm -rf *.egg-info build dist + rm -f .coverage + find . \ + -name .venv -prune \ + -o -name __pycache__ -print \ + -o -name .pytest_cache -print \ + | xargs -r rm -rf diff --git a/ffpass/__init__.py b/ffpass/__init__.py index 7a10ff7..c655291 100644 --- a/ffpass/__init__.py +++ b/ffpass/__init__.py @@ -72,7 +72,8 @@ def censor(data): return s third = length // 3 - return f"{s[:third]}.....{s[2*third:]}" + two_thirds = (2 * length) // 3 + return f"{s[:third]}.....{s[two_thirds:]}" def clean_iv(iv_bytes): @@ -148,24 +149,11 @@ def decrypt_key_entry(a11, global_salt, master_password): return None -def get_all_keys(directory, pwd=""): - db = Path(directory) / "key4.db" - if not db.exists(): - raise NoDatabase() - - conn = sqlite3.connect(str(db)) - c = conn.cursor() - - # 1. Get Global Salt - c.execute("SELECT item1, item2 FROM metadata WHERE id = 'password'") - try: - global_salt, item2 = next(c) - except StopIteration: - raise NoDatabase() - - logging.info(f"[*] Global Salt: {censor(global_salt)}") - - # 2. VERIFY PASSWORD EXPLICITLY +def verify_password(global_salt, item2, pwd): + """ + Verifies the master password against the metadata entry (item2). + Raises WrongPassword on failure. + """ try: decodedItem2, _ = der_decode(item2) try: @@ -198,6 +186,26 @@ def get_all_keys(directory, pwd=""): logging.debug(f"Password check failed: {e}") raise WrongPassword() + +def get_all_keys(directory, pwd=""): + db = Path(directory) / "key4.db" + if not db.exists(): + raise NoDatabase() + + conn = sqlite3.connect(str(db)) + c = conn.cursor() + + # 1. Get Global Salt + c.execute("SELECT item1, item2 FROM metadata WHERE id = 'password'") + try: + global_salt, item2 = next(c) + except StopIteration: + raise NoDatabase() + + logging.info(f"[*] Global Salt: {censor(global_salt)}") + + # 2. VERIFY PASSWORD EXPLICITLY + verify_password(global_salt, item2, pwd) logging.info("[*] Password Verified Correctly") # 3. Find ALL Keys @@ -205,7 +213,7 @@ def get_all_keys(directory, pwd=""): rows = c.fetchall() logging.info(f"[*] Found {len(rows)} entries in nssPrivate") - # FIX: Check if rows exist BEFORE assuming corruption. + # Check if rows exist BEFORE assuming corruption. # If the table is empty, it's just an empty DB, not corruption. if not rows: raise NoDatabase() @@ -240,7 +248,7 @@ def try_decrypt_login(key, ciphertext, iv): text = res.decode('utf-8') if is_valid_text(text): return text, "AES-Standard" - except: + except Exception: pass # Try 3DES @@ -252,7 +260,7 @@ def try_decrypt_login(key, ciphertext, iv): text = res.decode('utf-8') if is_valid_text(text): return text, "3DES-Standard" - except: + except Exception: pass return None, None @@ -339,21 +347,56 @@ def exportLogins(key, jsonLogins): return logins -def lower_header(csv_file): - it = iter(csv_file) - yield next(it).lower() - yield from it +def readCSV(csv_file): + # Peek at the first line to detect if header exists + first_line = csv_file.readline() + logging.debug(f"first_line: {first_line}") + if not first_line: + logging.warning('WARN: Nothing detected, first line is blank.') + return [] + # Heuristic: if it contains both keys (at index=2,3), assume it's a header + is_header = ( + first_line[1].lower() in {"username", "uname", "user", "u"} + and first_line[2].lower() in {"password", "passwd", "pass", "p"} # noqa: W503 line break before binary operator + ) + + if is_header: + # Re-chain the first line, but ensure DictReader lowercases the header row + # by creating an iterator that lowercases the first item only + def normalized_iter(): + yield first_line.lower() + yield from csv_file + reader = csv.DictReader(normalized_iter()) + else: + # No header: Assume default order and provide fieldnames + reader = csv.reader(csv_file) -def readCSV(csv_file): logins = [] - reader = csv.DictReader(lower_header(csv_file)) for row in reader: - logins.append((rawURL(row["url"]), row["username"], row["password"])) + if isinstance(reader, csv.DictReader): + logging.error("DictReader it is!") + u = row.get("url", "") + n = row.get("username", "") + p = row.get("password", "") + else: + logging.error("RowReader it is!") + u, n, p = row + logins.append((rawURL(u), n, p)) + return logins def rawURL(url): + if not url: + return "" + + # Fix for schemeless URLs (e.g. "test.com" -> "https://test.com") + # Without a scheme, urlparse puts the whole string in 'path' and leaves 'netloc' empty. + # ffpass expects 'netloc' to be populated to strip paths. + if "://" not in url: + url = "https://" + url + p = urlparse(url) return type(p)(*p[:2], *[""] * 4).geturl() @@ -361,9 +404,9 @@ def rawURL(url): def addNewLogins(key, jsonLogins, logins): nextId = jsonLogins["nextId"] timestamp = int(datetime.now().timestamp() * 1000) - logging.info('adding logins') + logging.warning(f'adding {len(logins)} logins') for i, (url, username, password) in enumerate(logins, nextId): - logging.debug(f'adding {url} {username}') + logging.info(f'adding {url} {username}') entry = { "id": i, "hostname": url, @@ -383,6 +426,7 @@ def addNewLogins(key, jsonLogins, logins): jsonLogins["logins"].append(entry) jsonLogins["nextId"] += len(logins) + # Constants used to guess cross-platform PROFILE_GUESS_DIRS = { "darwin": "~/Library/Application Support/Firefox/Profiles", @@ -391,6 +435,7 @@ PROFILE_GUESS_DIRS = { "cygwin": os.path.expandvars(r"%LOCALAPPDATA%\Mozilla\Firefox\Profiles"), } + def getProfiles(): paths = Path(PROFILE_GUESS_DIRS[sys.platform]).expanduser() logging.debug(f"Paths: {paths}") @@ -454,14 +499,8 @@ def main_export(args): def main_import(args): - if args.file == sys.stdin: - try: - key = askpass(args.directory) - except WrongPassword: - logging.error("Password is not empty. You have to specify FROM_FILE.") - sys.exit(1) - else: - key = askpass(args.directory) + # askpass handles stdin/tty detection for the password prompt automatically + key = askpass(args.directory) if not key: logging.error("Failed to derive master key.") @@ -498,7 +537,7 @@ def makeParser(): ) for sub in subparsers.choices.values(): - sub.add_argument( + arg = sub.add_argument( "-p", # matches native: firefox -p "-d", "--directory", @@ -507,9 +546,11 @@ def makeParser(): metavar="DIRECTORY", default=None, help="Firefox profile directory", - # argcomplete - choices=getProfiles(), ) + # Use argcomplete completer instead of 'choices=' + # This allows arbitrary paths (tests) but gives users tab completion. + arg.completer = lambda **kwargs: [str(p) for p in getProfiles()] + try: pass except ImportError: @@ -532,36 +573,36 @@ def makeParser(): def main(): - - logging.basicConfig(level=logging.ERROR, format="%(message)s") + # default log level is warning + logging.basicConfig(level=logging.WARNING, format="%(message)s") parser = makeParser() args = parser.parse_args() - # Default level WARN (quiet), INFO for verbose, DEBUG for debug + # Adjust level based on flags if args.debug: logging.getLogger().setLevel(logging.DEBUG) elif args.verbose: logging.getLogger().setLevel(logging.INFO) - else: - logging.getLogger().setLevel(logging.WARNING) + # try to obtain profile directory if args.directory is None: try: args.directory = guessDir() except NoProfile: - logging.warning("No Firefox profile selected.") + logging.error("No Firefox profile selected.") parser.print_help() parser.exit() args.directory = args.directory.expanduser() + # run arg parser try: - # Wrap in try/except for BrokenPipeError to allow piping to head + # Wrap in try/except for BrokenPipeError to allow piping to head, i.e., ffpass export | head -5 try: args.func(args) except BrokenPipeError: sys.stdout = os.fdopen(1, 'w') - pass + except NoDatabase: logging.error("Firefox password database is empty.") -- 2.52.0