From 4b16d615e62e539ac4e3506f474b4f7ddba1b2a9 Mon Sep 17 00:00:00 2001 From: KyleF0X <96126274+KyleF0X@users.noreply.github.com> Date: Wed, 26 Feb 2025 00:05:13 +1000 Subject: [PATCH] Add files via upload Changed the ETA function in seed recovery so its calculated by MATH, not COUNTING, which can take hours/days. This way the user doesnt have to use --no-eta, and lose their ability to have an ETA, but doesnt have to waste time counting the number of permutations. --- btcrecover/btcrpass.py | 2861 +++++++++++++++++++++++++++++++++++++++- btcrecover/btcrseed.py | 184 ++- 2 files changed, 2971 insertions(+), 74 deletions(-) diff --git a/btcrecover/btcrpass.py b/btcrecover/btcrpass.py index be474577..1118a52c 100644 --- a/btcrecover/btcrpass.py +++ b/btcrecover/btcrpass.py @@ -25,6 +25,2772 @@ __version__ = "1.13.0-Cryptoguide" __ordering_version__ = b"0.6.4" # must be updated whenever password ordering changes disable_security_warnings = True +progress_bar_widgets_global = None # Will store custom progress bar widgets + +# Import modules included in standard libraries +import sys, argparse, itertools, string, re, multiprocessing, signal, os, pickle, gc, \ + time, timeit, hashlib, collections, base64, struct, atexit, zlib, math, json, numbers, datetime, binascii, gzip + +# Import modules bundled with BTCRecover +import btcrecover.opencl_helpers +import lib.cardano.cardano_utils as cardano +from lib.eth_hash.auto import keccak +from lib.mnemonic_btc_com_tweaked import Mnemonic +from lib import progressbar + +module_leveldb_available = False +try: + from lib.ccl_chrome_indexeddb import ccl_leveldb + + module_leveldb_available = True +except: + pass + +hashlib_ripemd160_available = False +# Enable functions that may not work for some standard libraries in some environments +try: + # this will work with micropython and python < 3.10 + # but will raise and exception if ripemd is not supported (python3.10, openssl 3) + hashlib.new('ripemd160') + hashlib_ripemd160_available = True + def ripemd160(msg): + return hashlib.new('ripemd160', msg).digest() +except: + # otherwise use pure python implementation + from lib.embit.py_ripemd160 import ripemd160 + +# Import modules from requirements.txt +from Crypto.Cipher import AES + +# Import optional modules +module_opencl_available = False +try: + from lib.opencl_brute import opencl + from lib.opencl_brute.opencl_information import opencl_information + import pyopencl + + module_opencl_available = True +except: + pass + +# Eth Keystore Libraries +module_eth_keyfile_available = False +try: + import eth_keyfile + + module_eth_keyfile_available = True +except: + pass + +# PY Crypto HD wallet module +py_crypto_hd_wallet_available = False +try: + import py_crypto_hd_wallet + + py_crypto_hd_wallet_available = True +except: + pass + +# Shamir-Mnemonic module +shamir_mnemonic_available = False +try: + import shamir_mnemonic + + from shamir_mnemonic.recovery import RecoveryState + from shamir_mnemonic.shamir import generate_mnemonics + from shamir_mnemonic.share import Share + from shamir_mnemonic.utils import MnemonicError + + import click + from click import style + + FINISHED = style("\u2713", fg="green", bold=True) + EMPTY = style("\u2717", fg="red", bold=True) + INPROGRESS = style("\u25cf", fg="yellow", bold=True) + + + def error(s: str) -> None: + click.echo(style("ERROR: ", fg="red") + s) + + shamir_mnemonic_available = True +except: + pass + +# Modules dependant on bitcoinutils +bitcoinutils_available = False +try: + import lib.block_io + bitcoinutils_available = True +except: + pass + +# Modules dependant on SJCL +sjcl_available = False +try: + from sjcl import SJCL + sjcl_available = True +except: + pass + +searchfailedtext = "\nAll possible passwords (as specified in your tokenlist or passwordlist) have been checked and none are correct for this wallet. You could consider trying again with a different password list or expanded tokenlist..." + +def load_customTokenWildcard(customTokenWildcardFile): + customTokenWildcards = [''] + if customTokenWildcardFile: + try: + customTokenWildcards_File = open(customTokenWildcardFile, "r", encoding="utf-8", errors='ignore') + customTokenWildcards_Lines = customTokenWildcards_File.readlines() + + for customTokenWildcard in customTokenWildcards_Lines: + customTokenWildcards.append(customTokenWildcard.strip()) + customTokenWildcards_File.close() + except Exception as e: + print(e) + return customTokenWildcards + +# Assemble and output some information about the current system and python environment. +def full_version(): + from struct import calcsize + return "btcrecover {} on Python {} {}-bit, {}-bit unicodes, {}-bit ints".format( + __version__, + ".".join(str(i) for i in sys.version_info[:3]), + calcsize(b"P") * 8, + sys.maxunicode.bit_length(), + sys.maxsize.bit_length() + 1 + ) + + +# One of these two is typically called relatively early by parse_arguments() +def enable_unicode_mode(): + global io, tstr, tstr_from_stdin, tchr + import locale, io + tstr = str + preferredencoding = locale.getpreferredencoding() + tstr_from_stdin = lambda s: s if isinstance(s, str) else str(s, preferredencoding) + tchr = chr +# + +################################### Configurables/Plugins ################################### +# wildcard sets, simple typo generators, and wallet support functions + + +# Recognized wildcard (e.g. %d, %a) types mapped to their associated sets +# of characters; used in expand_wildcards_generator() +# warning: these can't be the key for a wildcard set: digits 'i' 'b' '[' ',' ';' '-' '<' '>' +def init_wildcards(wildcard_custom_list_e = None, + wildcard_custom_list_f = None, + wildcard_custom_list_j = None, + wildcard_custom_list_k = None): + global wildcard_sets, wildcard_keys, wildcard_nocase_sets, wildcard_re, \ + custom_wildcard_cache, backreference_maps, backreference_maps_sha1 + # N.B. that tstr() will not convert string.*case to Unicode correctly if the locale has + # been set to one with a single-byte code page e.g. ISO-8859-1 (Latin1) or Windows-1252 + wildcard_sets = { + tstr("H") : tstr(string.hexdigits), + tstr("B") : tstr("123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"), + tstr("d") : tstr(string.digits), + tstr("a") : tstr(string.ascii_lowercase), + tstr("A") : tstr(string.ascii_uppercase), + tstr("n") : tstr(string.ascii_lowercase + string.digits), + tstr("N") : tstr(string.ascii_uppercase + string.digits), + tstr("s") : tstr(" "), # space + tstr("l") : tstr("\n"), # line feed + tstr("r") : tstr("\r"), # carriage return + tstr("R") : tstr("\n\r"), # newline characters + tstr("t") : tstr("\t"), # tab + tstr("T") : tstr(" \t"), # space and tab + tstr("w") : tstr(" \r\n"), # space and newline characters + tstr("W") : tstr(" \r\n\t"), # space, newline, and tab + tstr("y") : tstr(string.punctuation), + tstr("Y") : tstr(string.digits + string.punctuation), + tstr("p") : tstr().join(map(tchr, range(33, 127))), # all ASCII printable characters except whitespace + tstr("P") : tstr().join(map(tchr, range(33, 127))) + tstr(" \r\n\t"), # as above, plus space, newline, and tab + tstr("q") : tstr().join(map(tchr, range(33, 127))) + tstr(" "), # all ASCII printable characters plus whitespace (All characters that are easily available for a Trezor Passphrase via keyboard or touchscreen entry) + tstr("U"): ''.join(chr(i) for i in range(65536)), # All possible 16 bit unicode characters + tstr("e"): load_customTokenWildcard(wildcard_custom_list_e), # %e and %f are special types of wildcards which can both be customised AND can occur multiple times, but always have the same value. (And can also include other types of wildcards) + tstr("f"): load_customTokenWildcard(wildcard_custom_list_f), + tstr("j"): load_customTokenWildcard(wildcard_custom_list_j), # %j and %k behave mostly like standard wildcards but can be entire words/strings and are loaded from a custom file + tstr("k"): load_customTokenWildcard(wildcard_custom_list_k), + # wildcards can be used to escape these special symbols + tstr("%") : tstr("%"), + tstr("^") : tstr("^"), + tstr("S") : tstr("$"), # the key is intentionally a capital "S", the value is a dollar sign + } + wildcard_keys = tstr().join(wildcard_sets) + # + # case-insensitive versions (e.g. %ia) of wildcard_sets for those which have them + wildcard_nocase_sets = { + tstr("a") : tstr(string.ascii_lowercase + string.ascii_uppercase), + tstr("A") : tstr(string.ascii_uppercase + string.ascii_lowercase), + tstr("n") : tstr(string.ascii_lowercase + string.ascii_uppercase + string.digits), + tstr("N") : tstr(string.ascii_uppercase + string.ascii_lowercase + string.digits) + } + # + wildcard_re = None + custom_wildcard_cache = dict() + backreference_maps = dict() + backreference_maps_sha1 = None + + +# Simple typo generators produce (as an iterable, e.g. a tuple, generator, etc.) +# zero or more alternative typo strings which can replace a single character. If +# more than one string is produced, all combinations are tried. If zero strings are +# produced (e.g. an empty tuple), then the specified input character has no typo +# alternatives that can be tried (e.g. you can't change the case of a caseless char). +# They are called with the full password and an index into that password of the +# character which will be replaced. +# +def typo_repeat(p, i): return 2 * p[i], # A single replacement of len 2. +def typo_delete(p, i): return tstr(""), # A single replacement of len 0. +def typo_case(p, i): # Returns a single replacement or no + swapped = p[i].swapcase() # replacement if it's a caseless char. + return (swapped,) if swapped != p[i] else () +# +def typo_closecase(p, i): # Returns a swapped case only when case transitions are nearby + cur_case_id = case_id_of(p[i]) # (case_id functions defined in the Password Generation section) + if cur_case_id == UNCASED_ID: return () + if i==0 or i+1==len(p) or \ + case_id_changed(case_id_of(p[i-1]), cur_case_id) or \ + case_id_changed(case_id_of(p[i+1]), cur_case_id): + return p[i].swapcase(), + return () +# +def typo_replace_wildcard(p, i): return [e for e in typos_replace_expanded if e != p[i]] + +def typo_map(p, i): + returnVal = "".join(list(typos_map.get(p[i], ()))) + return returnVal + +# (typos_replace_expanded and typos_map are initialized from args.typos_replace +# and args.typos_map respectively in parse_arguments() ) +# +# a dict: command line argument name is: "typos-" + key_name; associated value is +# the generator function from above; this dict MUST BE ORDERED to prevent the +# breakage of --skip and --restore features (the order can be arbitrary, but it +# MUST be repeatable across runs and preferably across implementations) +simple_typos = collections.OrderedDict() +simple_typos["repeat"] = typo_repeat +simple_typos["delete"] = typo_delete +simple_typos["case"] = typo_case +simple_typos["closecase"] = typo_closecase +simple_typos["replace"] = typo_replace_wildcard +simple_typos["map"] = typo_map +# +# a dict: typo name (matches typo names in the dict above) mapped to the options +# that are passed to add_argument; this dict is only ordered for cosmetic reasons +simple_typo_args = collections.OrderedDict() +simple_typo_args["repeat"] = dict( action="store_true", help="repeat (double) a character" ) +simple_typo_args["delete"] = dict( action="store_true", help="delete a character" ) +simple_typo_args["case"] = dict( action="store_true", help="change the case (upper/lower) of a letter" ) +simple_typo_args["closecase"] = dict( action="store_true", help="like --typos-case, but only change letters next to those with a different case") +simple_typo_args["map"] = dict( metavar="FILE", help="replace specific characters based on a map file" ) +simple_typo_args["replace"] = dict( metavar="WILDCARD-STRING", help="replace a character with another string or wildcard" ) + + +# A class decorator which adds a wallet class to the registered list +wallet_types = [] +wallet_types_by_id = {} +def register_wallet_class(cls): + global wallet_types, wallet_types_by_id + wallet_types.append(cls) + try: + assert cls.data_extract_id() not in wallet_types_by_id,\ + "register_wallet_class: registered wallet types must have unique data_extract_id's" + wallet_types_by_id[cls.data_extract_id()] = cls + except AttributeError: + pass + + return cls + +# Clears the current set of registered wallets (including those registered by default below) +def clear_registered_wallets(): + global wallet_types, wallet_types_by_id + wallet_types = [] + wallet_types_by_id = {} + + +# The max wallet file size in bytes (prevents trying to load huge files which clearly aren't wallets) +MAX_WALLET_FILE_SIZE = 64 * 2**20 # 64 MiB + +# Loads a wallet object and returns it (possibly for external libraries to use) +def load_wallet(wallet_filename): + # Ask each registered wallet type if the file might be of their type, + # and if so load the wallet + uncertain_wallet_types = [] + try: + with open(wallet_filename, "rb") as wallet_file: + for wallet_type in wallet_types: + found = wallet_type.is_wallet_file(wallet_file) + if found: + wallet_file.close() + return wallet_type.load_from_filename(wallet_filename) + elif found is None: # None means it might still be this type of wallet... + uncertain_wallet_types.append(wallet_type) + except PermissionError: #Metamask wallets can be a folder which may throw a PermissionError or IsADirectoryError + return WalletMetamask.load_from_filename(wallet_filename) + except IsADirectoryError: + return WalletMetamask.load_from_filename(wallet_filename) + + # If the wallet type couldn't be definitively determined, try each + # questionable type (which must raise ValueError on a load failure) + uncertain_errors = [] + for wallet_type in uncertain_wallet_types: + try: + return wallet_type.load_from_filename(wallet_filename) + except Exception as e: + uncertain_errors.append(wallet_type.__name__ + ": " + str(e)) + + error_exit("unrecognized wallet format" + + ("; heuristic parser(s) reported:\n " + "\n ".join(uncertain_errors) if uncertain_errors else "") ) + +# Loads a wallet object into the loaded_wallet global from a filename +def load_global_wallet(wallet_filename): + global loaded_wallet + loaded_wallet = load_wallet(wallet_filename) + +# Given a base64 string that was produced by one of the extract-* scripts, determines +# the wallet type and sets the loaded_wallet global to a corresponding wallet object +def load_from_base64_key(key_crc_base64): + global loaded_wallet + + try: key_crc_data = base64.b64decode(key_crc_base64) + except TypeError: error_exit("encrypted key data is corrupted (invalid base64)") + + # Check the CRC + if len(key_crc_data) < 8: + error_exit("encrypted key data is corrupted (too short)") + key_data = key_crc_data[:-4] + (key_crc,) = struct.unpack(b" max_local_ws: + error_exit("--local-ws of", local_ws[i], "exceeds max of", max_local_ws, "for GPU '"+device.name.strip()+"' with Bitcoin Core wallets") + + # Create one command queue and one I/O buffer per device + self._cl_queues = [] + self._cl_hashes_buffers = [] + for i, device in enumerate(devices): + self._cl_queues.append(pyopencl.CommandQueue(cl_context, device)) + # Each buffer is of len --global-ws * (size-of-sha512-hash-in-bytes == 512 bits / 8 == 64) + self._cl_hashes_buffers.append(pyopencl.Buffer(cl_context, pyopencl.mem_flags.READ_WRITE, global_ws[i] * 64)) + + # Doing all iter_count iterations at once will hang the GPU, so instead calculate how + # many iterations should be done at a time based on iter_count and the requested int_rate, + # rounding up to maximize the number of iterations done in the last set to optimize performance + assert hasattr(self, "_iter_count") and self._iter_count, "WalletBitcoinCore.init_opencl_kernel: bitcoin core wallet or mkey has been loaded" + self._iter_count_chunksize = self._iter_count // int_rate or 1 + if self._iter_count_chunksize % int_rate != 0: # if not evenly divisible, + self._iter_count_chunksize += 1 # then round up + + def _return_verified_password_or_false_gpu(self, arg_passwords): # Bitcoin Core (Legacy GPU) + assert len(arg_passwords) <= sum(self._cl_global_ws), "WalletBitcoinCore.return_verified_password_or_false_opencl: at most --global-ws passwords" + + # Convert Unicode strings to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + # The first iter_count iteration is done by the CPU + hashes = numpy.empty([sum(self._cl_global_ws), 64], numpy.uint8) + for i, password in enumerate(passwords): + hashes[i] = numpy.frombuffer(hashlib.sha512(password + self._salt).digest(), numpy.uint8) + + # Divide up and copy the starting hashes into the OpenCL buffer(s) (one per device) in parallel + done = [] # a list of OpenCL event objects + offset = 0 + for devnum, ws in enumerate(self._cl_global_ws): + done.append(pyopencl.enqueue_copy(self._cl_queues[devnum], self._cl_hashes_buffers[devnum], + hashes[offset : offset + ws], is_blocking=False)) + self._cl_queues[devnum].flush() # Starts the copy operation + offset += ws + pyopencl.wait_for_events(done) + + # Doing all iter_count iterations at once will hang the GPU, so instead do iter_count_chunksize + # iterations at a time, pausing briefly while waiting for them to complete, and then continuing. + # Because iter_count is probably not evenly divisible by iter_count_chunksize, the loop below + # performs all but the last of these iter_count_chunksize sets of iterations. + + i = 1 - self._iter_count_chunksize # used if the loop below doesn't run (when --int-rate == 1) + for i in range(1, self._iter_count - self._iter_count_chunksize, self._iter_count_chunksize): + done = [] # a list of OpenCL event objects + # Start up a kernel for each device to do one set of iter_count_chunksize iterations + for devnum in range(len(self._cl_devices)): + done.append(self._cl_kernel(self._cl_queues[devnum], (self._cl_global_ws[devnum],), + None if self._cl_local_ws[devnum] is None else (self._cl_local_ws[devnum],), + self._cl_hashes_buffers[devnum], self._iter_count_chunksize)) + self._cl_queues[devnum].flush() # Starts the kernel + pyopencl.wait_for_events(done) + + # Perform the last remaining set of iterations (usually fewer then iter_count_chunksize) + done = [] # a list of OpenCL event objects + for devnum in range(len(self._cl_devices)): + done.append(self._cl_kernel(self._cl_queues[devnum], (self._cl_global_ws[devnum],), + None if self._cl_local_ws[devnum] is None else (self._cl_local_ws[devnum],), + self._cl_hashes_buffers[devnum], self._iter_count - self._iter_count_chunksize - i)) + self._cl_queues[devnum].flush() # Starts the kernel + pyopencl.wait_for_events(done) + + # Copy the resulting fully computed hashes back to RAM in parallel + done = [] # a list of OpenCL event objects + offset = 0 + for devnum, ws in enumerate(self._cl_global_ws): + done.append(pyopencl.enqueue_copy(self._cl_queues[devnum], hashes[offset : offset + ws], + self._cl_hashes_buffers[devnum], is_blocking=False)) + offset += ws + self._cl_queues[devnum].flush() # Starts the copy operation + pyopencl.wait_for_events(done) + + # Convert Unicode strings to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + # Using the computed hashes, try to decrypt the master key (in CPU) + for i, password in enumerate(passwords): + derived_key = hashes[i].tobytes() + part_master_key = aes256_cbc_decrypt(derived_key[:32], self._part_encrypted_master_key[:16], self._part_encrypted_master_key[16:]) + # If the last block (bytes 16-31) of part_encrypted_master_key is all padding, we've found it + if part_master_key == b"\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10": + return password.decode("utf_8", "replace"), i + 1 + return False, i + 1 + + +@register_wallet_class +class WalletPywallet(WalletBitcoinCore): + + def data_extract_id(): + return False # there is none + + @staticmethod + def is_wallet_file(wallet_file): return None # there's no easy way to check this + + # Load a Bitcoin Core encrypted master key from a file created by pywallet.py --dumpwallet + @classmethod + def load_from_filename(cls, wallet_filename): + # pywallet dump files are largish json files often preceded by a bunch of error messages; + # search through the file in 16k blocks looking for a particular string which occurs twice + # inside the mkey object we need (because it appears twice, we're guaranteed one copy + # will appear whole in at least one block even if the other is split across blocks). + # + # For the first block, give up if this doesn't look like a text file + with open(wallet_filename) as wallet_file: + last_block = "" + cur_block = wallet_file.read(16384) + if sum(1 for c in cur_block if ord(c)>126 or ord(c)==0) > 512: # about 3% + raise ValueError("Unrecognized pywallet format (does not look like ASCII text)") + while cur_block: + found_at = cur_block.find('"nDerivation') + if found_at >= 0: break + last_block = cur_block + cur_block = wallet_file.read(16384) + else: + raise ValueError("Unrecognized pywallet format (can't find mkey)") + + cur_block = last_block + cur_block + wallet_file.read(4096) + found_at = cur_block.rfind("{", 0, found_at + len(last_block)) + if found_at < 0: + raise ValueError("Unrecognized pywallet format (can't find mkey opening brace)") + wallet = json.JSONDecoder().raw_decode(cur_block[found_at:])[0] + + if not all(name in wallet for name in ("nDerivationIterations", "nDerivationMethod", "nID", "salt")): + raise ValueError("Unrecognized pywallet format (can't find all mkey attributes)") + + if wallet["nID"] != 1: + raise NotImplementedError("Unsupported Bitcoin Core wallet ID " + wallet["nID"]) + if wallet["nDerivationMethod"] != 0: + raise NotImplementedError("Unsupported Bitcoin Core key derivation method " + wallet["nDerivationMethod"]) + + if "encrypted_key" in wallet: + encrypted_master_key = wallet["encrypted_key"] + elif "crypted_key" in wallet: + encrypted_master_key = wallet["crypted_key"] + else: + raise ValueError("Unrecognized pywallet format (can't find [en]crypted_key attribute)") + + # These are the same as retrieved and saved by load_bitcoincore_wallet() + self = cls(loading=True) + encrypted_master_key = base64.b16decode(encrypted_master_key, casefold=True) + self._salt = base64.b16decode(wallet["salt"], True) + self._iter_count = int(wallet["nDerivationIterations"]) + + if len(encrypted_master_key) != 48: raise NotImplementedError("Unsupported encrypted master key length") + if len(self._salt) != 8: raise NotImplementedError("Unsupported salt length") + if self._iter_count <= 0: raise NotImplementedError("Unsupported iteration count") + + # only need the final 2 encrypted blocks (half of it padding) plus the salt and iter_count saved above + self._part_encrypted_master_key = encrypted_master_key[-32:] + return self + + +############### MultiBit ############### +# - MultiBit .key backup files +# - MultiDoge .key backup files +# - Bitcoin Wallet for Android/BlackBerry v3.47+ wallet backup files +# - Bitcoin Wallet for Android/BlackBerry v2.24 and older key backup files +# - Bitcoin Wallet for Android/BlackBerry v2.3 - v3.46 key backup files +# - KnC for Android key backup files (same as the above) + +@register_wallet_class +class WalletMultiBit(object): + opencl_algo = -1 + _dump_privkeys_file = None + + def data_extract_id(): + return "mb" + + # MultiBit private key backup file (not the wallet file) + @staticmethod + def is_wallet_file(wallet_file): + wallet_file.seek(0) + try: + base64EncodedData = wallet_file.read(20).lstrip()[:12] + data = base64.b64decode(base64EncodedData) + except binascii.Error: return False + return data.startswith(b"Salted__") + + def __init__(self, loading = False): + assert loading, 'use load_from_* to create a ' + self.__class__.__name__ + aes_library_name = load_aes256_library().__name__ + self._passwords_per_second = 100000 if aes_library_name == "Crypto" else 5000 + + def __setstate__(self, state): + # (re-)load the required libraries after being unpickled + load_aes256_library(warnings=False) + self.__dict__ = state + + # This just dumps the wallet private keys for Android Wallets + def dump_privkeys(self, wallet_data): + with open(self._dump_privkeys_file, 'a') as logfile: + from . import bitcoinj_pb2 + global pylibscrypt + import lib.pylibscrypt as pylibscrypt + pad_len = wallet_data[-1] + if isinstance(pad_len, str): + pad_len = ord(pad_len) + + # Attempt to dump the menemonic from the wallet (standard BitcoinJ file) + pbdata = wallet_data[:-pad_len] + pb_wallet = bitcoinj_pb2.Wallet() + pb_wallet.ParseFromString(pbdata) + mnemonic = WalletBitcoinj.extract_mnemonic(pb_wallet) + logfile.write("Android Wallet Mnemonic: '" + mnemonic.decode() + "' derivation path: m/0'") + + # This just dumps the wallet private keys for Multibit Classic, Multidoge Wallets + def dump_privkeys_keybackup(self, key1, key2, iv): + with open(self._dump_privkeys_file, 'a') as logfile: + decrypted_wallet = aes256_cbc_decrypt(key1 + key2, iv, self._encrypted_wallet).decode().splitlines() + for line in decrypted_wallet: + try: + key, date = line.split(" ") + logfile.write(key + "\n") + except: + pass + + def passwords_per_seconds(self, seconds): + return max(int(round(self._passwords_per_second * seconds)), 1) + + # Load a Multibit private key backup file (the part of it we need) + @classmethod + def load_from_filename(cls, privkey_filename): + with open(privkey_filename) as privkey_file: + # Multibit privkey files contain base64 text split into multiple lines; + # we need the first 48 bytes after decoding, which translates to 64 before. + data = "".join(privkey_file.read().split()) # join multiple lines into one + + if len(data) < 64: raise EOFError("Expected at least 64 bytes of text in the MultiBit private key file") + data = base64.b64decode(data) + assert data.startswith(b"Salted__"), "WalletBitcoinCore.load_from_filename: file starts with base64 'Salted__'" + if len(data) < 48: raise EOFError("Expected at least 48 bytes of decoded data in the MultiBit private key file") + self = cls(loading=True) + self._encrypted_block = data[16:48] # the first two 16-byte AES blocks + self._encrypted_wallet = data[16:] + self._salt = data[8:16] + return self + + # Import a MultiBit private key that was extracted by extract-multibit-privkey.py + @classmethod + def load_from_data_extract(cls, privkey_data): + assert len(privkey_data) == 24 + print("WARNING: read the Usage for MultiBit Classic section of Extract_Scripts.md before proceeding", file=sys.stderr) + self = cls(loading=True) + self._encrypted_block = privkey_data[8:] # a single 16-byte AES block + self._salt = privkey_data[:8] + return self + + def difficulty_info(self): + return "3 MD5 iterations" + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + assert b"1" < b"9" < b"A" < b"Z" < b"a" < b"z" # the b58 check below assumes ASCII ordering in the interest of speed + def return_verified_password_or_false(self, orig_passwords): # Multibit + # Copy a few globals into local for a small speed boost + l_md5 = hashlib.md5 + l_aes256_cbc_decrypt = aes256_cbc_decrypt + encrypted_block = self._encrypted_block + salt = self._salt + + # Convert Unicode strings (lazily) to UTF-16 bytestrings, truncating each code unit to 8 bits + passwords = map(lambda p: p.encode("utf_16_le", "ignore")[::2], orig_passwords) + + for count, password in enumerate(passwords, 1): + salted = password + salt + key1 = l_md5(salted).digest() + key2 = l_md5(key1 + salted).digest() + iv = l_md5(key2 + salted).digest() + b58_privkey = l_aes256_cbc_decrypt(key1 + key2, iv, encrypted_block[:16]) + + # (all this may be fragile, e.g. what if comments or whitespace precede what's expected in future versions?) + if type(b58_privkey) == str: + b58_privkey = b58_privkey.encode() + if chr(b58_privkey[0]) in "LK5Q\x0a#": + # + # Does it look like a base58 private key (MultiBit, MultiDoge, or oldest-format Android key backup)? + if b58_privkey[0] in "LK5Q".encode(): # private keys always start with L, K, or 5, or for MultiDoge Q + for c in b58_privkey[1:]: + # If it's outside of the base58 set [1-9A-HJ-NP-Za-km-z], break + if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": + break + # If the loop above doesn't break, it's base58-looking so far + else: + # If another AES block is available, decrypt and check it as well to avoid false positives + if len(encrypted_block) >= 32: + b58_privkey = l_aes256_cbc_decrypt(key1 + key2, encrypted_block[:16], encrypted_block[16:32]) + for c in b58_privkey: + if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": + break # not base58 + # If the loop above doesn't break, it's base58; we've found it + else: + if self._dump_privkeys_file: + self.dump_privkeys_keybackup(key1, key2, iv) + return orig_passwords[count-1], count + else: + # (when no second block is available, there's a 1 in 300 billion false positive rate here) + if self._dump_privkeys_file: + self.dump_privkeys_keybackup(key1, key2, iv) + return orig_passwords[count - 1], count + # + # Does it look like a bitcoinj protobuf (newest Bitcoin for Android backup) + elif b58_privkey[2:6] == b"org." and b58_privkey[0] == 10 and b58_privkey[1] < 128: + for c in b58_privkey[6:14]: + # If it doesn't look like a lower alpha domain name of len >= 8 (e.g. 'bitcoin.'), break + if c > ord("z") or (c < ord("a") and c != ord(".")): + break + # If the loop above doesn't break, it looks like a domain name; we've found it + else: + print("Notice: Found Bitcoin for Android Wallet Password") + if self._dump_privkeys_file: + #try: + if True: + wallet_data = l_aes256_cbc_decrypt(key1 + key2, iv, self._encrypted_wallet) + self.dump_privkeys(wallet_data) + #except: + # print("Unable to decode wallet mnemonic (common for very old wallets)") + + return orig_passwords[count - 1], count + # + # Does it look like a KnC for Android key backup? + elif b58_privkey == b"# KEEP YOUR PRIV": + if isinstance(orig_passwords[count-1],str): + return orig_passwords[count-1], count + if isinstance(orig_passwords[count - 1], bytes): + return orig_passwords[count-1].decode(), count + + return False, count + +############### bitcoinj ############### + +# A namedtuple with the same attributes as the protobuf message object from bitcoinj_pb2 +# (it's a global so that it's pickleable) +EncryptionParams = collections.namedtuple("EncryptionParams", "salt n r p") + +@register_wallet_class +class WalletBitcoinj(object): + opencl_algo = -1 + _dump_privkeys_file = None + + def data_extract_id(): + return "bj" + + def passwords_per_seconds(self, seconds): + passwords_per_second = self._passwords_per_second + if hasattr(self, "_scrypt_n"): + passwords_per_second /= self._scrypt_n / 16384 # scaled by default N + passwords_per_second /= self._scrypt_r / 8 # scaled by default r + passwords_per_second /= self._scrypt_p / 1 # scaled by default p + return max(int(round(passwords_per_second * seconds)), 1) + + @staticmethod + def is_wallet_file(wallet_file): + wallet_file.seek(0) + if wallet_file.read(1) == b"\x0a": # protobuf field number 1 of type length-delimited + network_identifier_len = ord(wallet_file.read(1)) + if 1 <= network_identifier_len < 128: + wallet_file.seek(2 + network_identifier_len) + c = wallet_file.read(1) + if c and c in b"\x12\x1a": # field number 2 or 3 of type length-delimited + return True + return False + + # From https://github.com/gurnec/decrypt_bitcoinj_seed + @staticmethod + def extract_mnemonic(pb_wallet, password = None): + from . import bitcoinj_pb2 + """extract and if necessary decrypt (w/scrypt) a BIP39 mnemonic from a bitcoinj wallet protobuf + + :param pb_wallet: a Wallet protobuf message + :type pb_wallet: wallet_pb2.Wallet + :param get_password_fn: a callback returning a password that's called iff one is required + :type get_password_fn: function + :return: the first BIP39 mnemonic found in the wallet or None if no password was entered when required + :rtype: str + """ + for key in pb_wallet.key: + if key.type == bitcoinj_pb2.Key.DETERMINISTIC_MNEMONIC: + + if key.HasField('secret_bytes'): # if not encrypted + return key.secret_bytes + + elif key.HasField('encrypted_data'): # if encrypted (w/scrypt) + # Derive the encryption key + aes_key = pylibscrypt.scrypt( + password, + pb_wallet.encryption_parameters.salt, + pb_wallet.encryption_parameters.n, + pb_wallet.encryption_parameters.r, + pb_wallet.encryption_parameters.p, + 32) + + # Decrypt the mnemonic + ciphertext = key.encrypted_data.encrypted_private_key + iv = key.encrypted_data.initialisation_vector + return aes256_cbc_decrypt(aes_key, iv, ciphertext).decode().replace("\\t", "") + + else: # if the loop exists normally, no mnemonic was found + raise ValueError('no BIP39 mnemonic found') + + def __init__(self, loading = False): + assert loading, 'use load_from_* to create a ' + self.__class__.__name__ + global pylibscrypt + import lib.pylibscrypt as pylibscrypt + # This is the base estimate for the scrypt N,r,p defaults of 16384,8,1 + if not pylibscrypt._done: + print("Warning: can't find an scrypt library, performance will be severely degraded", file=sys.stderr) + self._passwords_per_second = 0.03 + else: + self._passwords_per_second = 14 + load_aes256_library() + + def __setstate__(self, state): + # (re-)load the required libraries after being unpickled + global pylibscrypt + import lib.pylibscrypt as pylibscrypt + load_aes256_library(warnings=False) + self.__dict__ = state + + # Load a bitcoinj wallet file (the part of it we need) + @classmethod + def load_from_filename(cls, wallet_filename): + with open(wallet_filename, "rb") as wallet_file: + filedata = wallet_file.read(MAX_WALLET_FILE_SIZE) # up to 64M, typical size is a few k + return cls._load_from_filedata(filedata) + + @classmethod + def _load_from_filedata(cls, filedata): + try: + from . import bitcoinj_pb2 + except ModuleNotFoundError: + print("Warning: Cannot load protobuf module, unable to check if this is a Coinomi wallet" + "... Be sure to install all requirements with the command 'pip3 install -r requirements.txt', see https://btcrecover.readthedocs.io/en/latest/INSTALL/") + + pb_wallet = bitcoinj_pb2.Wallet() + pb_wallet.ParseFromString(filedata) + + if pb_wallet.encryption_type == bitcoinj_pb2.Wallet.UNENCRYPTED: + print("\nWallet Not Encrypted, Contains the following Private Keys") + for key in pb_wallet.key: + from lib.cashaddress import base58 + privkey_wif = base58.b58encode_check(bytes([0x80]) + key.secret_bytes + bytes([0x1])) + print(privkey_wif) + print() + raise ValueError("bitcoinj wallet is not encrypted") + if pb_wallet.encryption_type != bitcoinj_pb2.Wallet.ENCRYPTED_SCRYPT_AES: + raise NotImplementedError("Unsupported bitcoinj encryption type "+str(pb_wallet.encryption_type)) + if not pb_wallet.HasField("encryption_parameters"): + raise ValueError("bitcoinj wallet is missing its scrypt encryption parameters") + + for key in pb_wallet.key: + if key.type in (bitcoinj_pb2.Key.ENCRYPTED_SCRYPT_AES, bitcoinj_pb2.Key.DETERMINISTIC_KEY) and key.HasField("encrypted_data"): + encrypted_len = len(key.encrypted_data.encrypted_private_key) + if encrypted_len == 48: + # only need the final 2 encrypted blocks (half of it padding) plus the scrypt parameters + self = cls(loading=True) + self._part_encrypted_key = key.encrypted_data.encrypted_private_key[-32:] + self._scrypt_salt = pb_wallet.encryption_parameters.salt + self._scrypt_n = pb_wallet.encryption_parameters.n + self._scrypt_r = pb_wallet.encryption_parameters.r + self._scrypt_p = pb_wallet.encryption_parameters.p + self.pb_wallet_filedata = filedata + return self + print("Warning: ignoring encrypted key of unexpected length ("+str(encrypted_len)+")", file=sys.stderr) + + raise ValueError("No encrypted keys found in bitcoinj wallet") + + # Import a bitcoinj private key that was extracted by extract-bitcoinj-privkey.py + @classmethod + def load_from_data_extract(cls, privkey_data): + self = cls(loading=True) + # The final 2 encrypted blocks + self._part_encrypted_key = privkey_data[:32] + # The scrypt parameters + self._scrypt_salt = privkey_data[32:40] + (self._scrypt_n, self._scrypt_r, self._scrypt_p) = struct.unpack(b"< I H H", privkey_data[40:]) + return self + + def difficulty_info(self): + return "scrypt N, r, p = {}, {}, {}".format(self._scrypt_n, self._scrypt_r, self._scrypt_p) + + def dump_privkeys(self, derived_key): + from . import bitcoinj_pb2 + pb_wallet = bitcoinj_pb2.Wallet() + pb_wallet.ParseFromString(self.pb_wallet_filedata) + + from lib.cashaddress import base58 + with open(self._dump_privkeys_file, 'a') as logfile: + for key in pb_wallet.key: + privkey = aes256_cbc_decrypt(derived_key, key.encrypted_data.initialisation_vector, + key.encrypted_data.encrypted_private_key)[:32] + privkey_wif = base58.b58encode_check(bytes([0x80]) + privkey + bytes([0x1])) + logfile.write(privkey_wif + "\n") + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def return_verified_password_or_false(self, passwords): # Bitcoinj + # Copy a few globals into local for a small speed boost + l_scrypt = pylibscrypt.scrypt + l_aes256_cbc_decrypt = aes256_cbc_decrypt + part_encrypted_key = self._part_encrypted_key + scrypt_salt = self._scrypt_salt + scrypt_n = self._scrypt_n + scrypt_r = self._scrypt_r + scrypt_p = self._scrypt_p + + + # Convert strings (lazily) to UTF-16BE bytestrings + passwords = map(lambda p: p.encode("utf_16_be", "ignore"), passwords) + + for count, password in enumerate(passwords, 1): + derived_key = l_scrypt(password, scrypt_salt, scrypt_n, scrypt_r, scrypt_p, 32) + part_key = l_aes256_cbc_decrypt(derived_key, part_encrypted_key[:16], part_encrypted_key[16:]) + # If the last block (bytes 16-31) of part_encrypted_key is all padding, we've found it + if part_key == b"\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10": + password = password.decode("utf_16_be", "replace") + if self._dump_privkeys_file: + self.dump_privkeys(derived_key) + + return password, count + + return False, count + + + +############### Coinomi ############### + +# A namedtuple with the same attributes as the protobuf message object from coinomi_pb2 +# (it's a global so that it's pickleable) +EncryptionParams = collections.namedtuple("EncryptionParams", "salt n r p") + +@register_wallet_class +class WalletCoinomi(WalletBitcoinj): + opencl_algo = -1 + _using_extract = False + _dump_privkeys_file = None + + def data_extract_id(): + return "cn" + + # This just dumps the wallet private keys + def dump_privkeys(self, derived_key): + with open(self._dump_privkeys_file, 'a') as logfile: + logfile.write("Private Keys (BIP39 seed and BIP32 Root Key) are below...\n") + mnemonic = aes256_cbc_decrypt(derived_key, self._mnemonic_iv, self._mnemonic) + mnemonic = mnemonic.decode()[:-1] + if mnemonic[-2:-1] != b'\x0c': + mnemonic = mnemonic.replace('\x0c', "") + " (BIP39 Passphrase In Use, if you don't have it use BIP32 root key to recover wallet)" + + logfile.write("BIP39 Mnemonic: " + mnemonic + "\n") + master_key = aes256_cbc_decrypt(derived_key, self._masterkey_encrypted_iv, self._masterkey_encrypted) + from lib.cashaddress import convert, base58 + xprv = base58.b58encode_check( + b'\x04\x88\xad\xe4\x00\x00\x00\x00\x00\x00\x00\x00\x00' + self._masterkey_chaincode + b'\x00' + master_key[ + :-16]) + logfile.write("\nBIP32 Root Key: " + xprv) + + @staticmethod + def is_wallet_file(wallet_file): + wallet_file.seek(0) + if wallet_file.read(1) == b'\x08': # protobuf field number 1 of type length-delimited + wallet_file.read(1) + if wallet_file.read(1) == b'\x12': + try: + from . import coinomi_pb2 + except ModuleNotFoundError: + exit( + "\nERROR: Cannot load protobuf module... Be sure to install all requirements with the command 'pip3 install -r requirements.txt', see https://btcrecover.readthedocs.io/en/latest/INSTALL/") + + try: + wallet_file.seek(0) + pb_wallet = coinomi_pb2.Wallet() + pb_wallet.ParseFromString(wallet_file.read()) + pockets = pb_wallet.pockets # Pockets is a fairly unique coinomi key... #This will certainly fail on non-coinomi protobuf wallets in Python 3.9+ + return True + except: + pass + + return False + + @classmethod + def _load_from_filedata(cls, filedata): + try: + from . import coinomi_pb2 + except ModuleNotFoundError: + exit( + "\nERROR: Cannot load protobuf module... Be sure to install all requirements with the command 'pip3 install -r requirements.txt', see https://btcrecover.readthedocs.io/en/latest/INSTALL/") + + pb_wallet = coinomi_pb2.Wallet() + pb_wallet.ParseFromString(filedata) + if pb_wallet.encryption_type == coinomi_pb2.Wallet.UNENCRYPTED: + raise ValueError("Coinomi wallet is not encrypted") + if pb_wallet.encryption_type != coinomi_pb2.Wallet.ENCRYPTED_SCRYPT_AES: + raise NotImplementedError("Unsupported Coinomi wallet encryption type "+str(pb_wallet.encryption_type)) + if not pb_wallet.HasField("encryption_parameters"): + raise ValueError("Coinomi wallet is missing its scrypt encryption parameters") + + # only need the final 2 encrypted blocks (half of it padding) plus the scrypt parameters + self = cls(loading=True) + self._encrypted_masterkey_part = pb_wallet.master_key.encrypted_data.encrypted_private_key[-32:] + self._scrypt_salt = pb_wallet.encryption_parameters.salt + self._scrypt_n = pb_wallet.encryption_parameters.n + self._scrypt_r = pb_wallet.encryption_parameters.r + self._scrypt_p = pb_wallet.encryption_parameters.p + self._mnemonic = pb_wallet.seed.encrypted_data.encrypted_private_key + self._mnemonic_iv = pb_wallet.seed.encrypted_data.initialisation_vector + self._masterkey_encrypted = pb_wallet.master_key.encrypted_data.encrypted_private_key + self._masterkey_encrypted_iv = pb_wallet.master_key.encrypted_data.initialisation_vector + self._masterkey_chaincode = pb_wallet.master_key.deterministic_key.chain_code + self._masterkey_pubkey = pb_wallet.master_key.public_key + return self + + # Import a bitcoinj private key that was extracted by extract-bitcoinj-privkey.py + @classmethod + def load_from_data_extract(cls, privkey_data): + self = cls(loading=True) + # The final 2 encrypted blocks + self._encrypted_masterkey_part = privkey_data[:32] + # The scrypt parameters + self._scrypt_salt = privkey_data[32:40] + (self._scrypt_n, self._scrypt_r, self._scrypt_p) = struct.unpack(b"< I H H", privkey_data[40:]) + self._using_extract = True + return self + + def difficulty_info(self): + return "scrypt N, r, p = {}, {}, {}".format(self._scrypt_n, self._scrypt_r, self._scrypt_p) + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def return_verified_password_or_false(self, passwords): # Bitcoinj + # Copy a few globals into local for a small speed boost + l_scrypt = pylibscrypt.scrypt + l_aes256_cbc_decrypt = aes256_cbc_decrypt + _encrypted_masterkey_part = self._encrypted_masterkey_part + scrypt_salt = self._scrypt_salt + scrypt_n = self._scrypt_n + scrypt_r = self._scrypt_r + scrypt_p = self._scrypt_p + + # Convert strings (lazily) to UTF-16BE bytestrings + passwords = map(lambda p: p.encode("utf_16_be", "ignore"), passwords) + + for count, password in enumerate(passwords, 1): + derived_key = l_scrypt(password, scrypt_salt, scrypt_n, scrypt_r, scrypt_p, 32) + part_key = l_aes256_cbc_decrypt(derived_key, _encrypted_masterkey_part[:16], _encrypted_masterkey_part[16:]) + + # If the last block (bytes 16-31) of part_encrypted_key is all padding, we've found it + if part_key == b"\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10": + if not self._using_extract and self._dump_privkeys_file: + self.dump_privkeys(derived_key) + + password = password.decode("utf_16_be", "replace") + return password, count + + return False, count + + +############### MultiBit HD ############### + +@register_wallet_class +class WalletMultiBitHD(WalletBitcoinj): + _dump_privkeys_file = None + + def data_extract_id(): + return "m5" + # id "m2", which *only* supported MultiBit HD prior to v0.5.0 ("m5" supports + # both before and after), is no longer supported as of btcrecover version 0.15.7 + + # This just dumps the wallet private keys + def dump_privkeys(self, derived_key, password): + with open(self._dump_privkeys_file, 'a') as logfile: + decrypted_data = aes256_cbc_decrypt(derived_key, self._iv, self._encrypted_data) + padding_len = decrypted_data[-1] + + from . import bitcoinj_pb2 + pb_wallet = bitcoinj_pb2.Wallet() + pb_wallet.ParseFromString(decrypted_data[:-padding_len]) + mnemonic = WalletBitcoinj.extract_mnemonic(pb_wallet, password) + logfile.write("BIP39 Seed: " + mnemonic) + + @staticmethod + def is_wallet_file(wallet_file): return None # there's no easy way to check this + + # Load a MultiBit HD wallet file (the part of it we need) + @classmethod + def load_from_filename(cls, wallet_filename): + # MultiBit HD wallet files look like completely random bytes, so we + # require that its name remain unchanged in order to "detect" it + if os.path.basename(wallet_filename) != "mbhd.wallet.aes": + raise ValueError("MultiBit HD wallet files must be named mbhd.wallet.aes") + + with open(wallet_filename, "rb") as wallet_file: + encrypted_data = wallet_file.read() + if len(encrypted_data) < 32: + raise ValueError("MultiBit HD wallet files must be at least 32 bytes long") + + # The likelihood of of finding a valid encrypted MultiBit HD wallet whose first 16,384 + # bytes have less than 7.8 bits of entropy per byte is... too small for me to figure out + entropy_bits = est_entropy_bits(encrypted_data) + if entropy_bits < 7.8: + raise ValueError("Doesn't look random enough to be an encrypted MultiBit HD wallet (only {:.1f} bits of entropy per byte)".format(entropy_bits)) + + self = cls(loading=True) + self._iv = encrypted_data[:16] # the AES initialization vector (v0.5.0+) + self._encrypted_block_iv = encrypted_data[16:32] # the first 16-byte encrypted block (v0.5.0+) + self._encrypted_block_noiv = encrypted_data[:16] # the first 16-byte encrypted block w/hardcoded IV (< v0.5.0) + self._encrypted_data = encrypted_data[16:] # Encrypted Data + self._encrypted_data_noiv = encrypted_data # Encrypted Data w/hardcoded IV (< v0.5.0) + return self + + # Import a MultiBit HD encrypted block that was extracted by extract-multibit-hd-data.py + @classmethod + def load_from_data_extract(cls, file_data): + self = cls(loading=True) + assert len(file_data) == 32 + self._iv = file_data[:16] # the AES initialization vector (v0.5.0+) + self._encrypted_block_iv = file_data[16:] # the first 16-byte encrypted block (v0.5.0+) + self._encrypted_block_noiv = file_data[:16] # the first 16-byte encrypted block w/hardcoded IV (< v0.5.0) + return self + + def difficulty_info(self): + return "scrypt N, r, p = 16384, 8, 1" + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def return_verified_password_or_false(self, passwords): # MultibitHD + # Copy a few globals into local for a small speed boost + l_scrypt = pylibscrypt.scrypt + l_aes256_cbc_decrypt = aes256_cbc_decrypt + iv = self._iv + encrypted_block_iv = self._encrypted_block_iv + encrypted_block_noiv = self._encrypted_block_noiv + + # Convert strings (lazily) to UTF-16BE bytestrings + passwords = map(lambda p: p.encode("utf_16_be", "ignore"), passwords) + + for count, password in enumerate(passwords, 1): + derived_key = l_scrypt(password, b'\x35\x51\x03\x80\x75\xa3\xb0\xc5', olen=32) # w/a hardcoded salt + block_iv = l_aes256_cbc_decrypt(derived_key, iv, encrypted_block_iv) # v0.5.0+ + block_noiv = l_aes256_cbc_decrypt( # < v0.5.0 + derived_key, + b'\xa3\x44\x39\x1f\x53\x83\x11\xb3\x29\x54\x86\x16\xc4\x89\x72\x3e', # the hardcoded iv + encrypted_block_noiv) + # + # Does it look like a bitcoinj protobuf file? + # (there's a 1 in 2 trillion chance this hits but the password is wrong) + for block in (block_iv, block_noiv): + if block[2:6] == b"org." and block[0] == 10 and block[1] < 128: + if self._dump_privkeys_file: + self.dump_privkeys(derived_key, password) + + password = password.decode("utf_16_be", "replace") + return password, count + + return False, count + + +############### Android Spending PIN ############### + +# don't @register_wallet_class -- it's never auto-detected and never used for a --data-extract +class WalletAndroidSpendingPIN(WalletBitcoinj): + + # Decrypt a Bitcoin Wallet for Android/BlackBerry backup into a standard bitcoinj wallet, and load it + @classmethod + def load_from_filename(cls, wallet_filename, password = None, force_purepython = False): + with open(wallet_filename, "rb") as wallet_file: + # If we're given an unencrypted backup, just return a WalletBitcoinj + if WalletBitcoinj.is_wallet_file(wallet_file): + wallet_file.close() + return WalletBitcoinj.load_from_filename(wallet_filename) + + wallet_file.seek(0) + data = wallet_file.read(MAX_WALLET_FILE_SIZE) # up to 64M, typical size is a few k + + data = data.replace(b"\r", b"").replace(b"\n", b"") + data = base64.b64decode(data) + if not data.startswith(b"Salted__"): + raise ValueError("Not a Bitcoin Wallet for Android/BlackBerry encrypted backup (missing 'Salted__')") + if len(data) < 32: + raise EOFError ("Expected at least 32 bytes of decoded data in the encrypted backup file") + if len(data) % 16 != 0: + raise ValueError("Not a valid Bitcoin Wallet for Android/BlackBerry encrypted backup (size not divisible by 16)") + salt = data[8:16] + data = data[16:] + + if not password: + password = prompt_unicode_password( + "Please enter the password for the Bitcoin Wallet for Android/BlackBerry backup: ", + "encrypted Bitcoin Wallet for Android/BlackBerry backups must be decrypted before searching for the PIN") + # Convert Unicode string to a UTF-16 bytestring, truncating each code unit to 8 bits + password = password.encode("utf_16_le", "ignore")[::2] + + # Decrypt the backup file (OpenSSL style) + load_aes256_library(force_purepython) + salted = password + salt + key1 = hashlib.md5(salted).digest() + key2 = hashlib.md5(key1 + salted).digest() + iv = hashlib.md5(key2 + salted).digest() + data = aes256_cbc_decrypt(key1 + key2, iv, data) + #from cStringIO import StringIO + if not WalletBitcoinj.is_wallet_file(io.BytesIO(data[:100])): + error_exit("can't decrypt wallet (wrong password?)") + # Validate and remove the PKCS7 padding + padding_len = data[-1] + if not (1 <= padding_len <= 16 and data.endswith((chr(padding_len) * padding_len).encode())): + error_exit("can't decrypt wallet, invalid padding (wrong password?)") + + return cls._load_from_filedata(data[:-padding_len]) # WalletBitcoinj._load_from_filedata() parses the bitcoinj wallet + + +############### mSIGNA ############### + +@register_wallet_class +class WalletMsigna(object): + opencl_algo = -1 + + def data_extract_id(): + return "ms" + + @staticmethod + def is_wallet_file(wallet_file): + wallet_file.seek(0) + # returns "maybe yes" or "definitely no" (Bither wallets are also SQLite 3) + return None if wallet_file.read(16) == b"SQLite format 3\0" else False + + def __init__(self, loading = False): + assert loading, 'use load_from_* to create a ' + self.__class__.__name__ + aes_library_name = load_aes256_library().__name__ + self._passwords_per_second = 50000 if aes_library_name == "Crypto" else 5000 + + def __setstate__(self, state): + # (re-)load the required libraries after being unpickled + load_aes256_library(warnings=False) + self.__dict__ = state + + def passwords_per_seconds(self, seconds): + return max(int(round(self._passwords_per_second * seconds)), 1) + + # Load an encrypted privkey and salt from the specified keychain given a filename of an mSIGNA vault + @classmethod + def load_from_filename(cls, wallet_filename): + # Find the one keychain to test passwords against or exit trying + import sqlite3 + wallet_conn = sqlite3.connect(wallet_filename) + wallet_conn.row_factory = sqlite3.Row + select = "SELECT * FROM Keychain" + try: + if "args" in globals() and args.msigna_keychain: # args is not defined during unit tests + wallet_cur = wallet_conn.execute(select + " WHERE name LIKE '%' || ? || '%'", (args.msigna_keychain,)) + else: + wallet_cur = wallet_conn.execute(select) + except sqlite3.OperationalError as e: + if str(e).startswith("no such table"): + raise ValueError("Not an mSIGNA wallet: " + str(e)) # it might be a Bither or Bitcoin Core wallet + else: + raise# unexpected error + keychain = wallet_cur.fetchone() + if not keychain: + error_exit("no such keychain found in the mSIGNA vault") + keychain_extra = wallet_cur.fetchone() + if keychain_extra: + print("Multiple matching keychains found in the mSIGNA vault:", file=sys.stderr) + print(" ", keychain["name"]) + print(" ", keychain_extra["name"]) + for keychain_extra in wallet_cur: + print(" ", keychain_extra["name"]) + error_exit("use --msigna-keychain NAME to specify a specific keychain") + wallet_conn.close() + + privkey_ciphertext = keychain["privkey_ciphertext"] + if len(privkey_ciphertext) == 32: + error_exit("mSIGNA keychain '"+keychain["name"]+"' is not encrypted") + if len(privkey_ciphertext) != 48: + error_exit("mSIGNA keychain '"+keychain["name"]+"' has an unexpected privkey length") + + # only need the final 2 encrypted blocks (half of which is padding) plus the salt + self = cls(loading=True) + self._part_encrypted_privkey = privkey_ciphertext[-32:] + self._salt = struct.pack("< q", keychain["privkey_salt"]) + return self + + # Import an encrypted privkey and salt that was extracted by extract-msigna-privkey.py + @classmethod + def load_from_data_extract(cls, privkey_data): + self = cls(loading=True) + self._part_encrypted_privkey = privkey_data[:32] + self._salt = privkey_data[32:] + return self + + def difficulty_info(self): + return "2 SHA-256 iterations" + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def return_verified_password_or_false(self, passwords): #mSIGNA + # Copy some vars into local for a small speed boost + l_sha1 = hashlib.sha1 + l_sha256 = hashlib.sha256 + part_encrypted_privkey = self._part_encrypted_privkey + salt = self._salt + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), passwords) + + for count, password in enumerate(passwords, 1): + password_hashed = l_sha256(l_sha256(password).digest()).digest() # mSIGNA does this first + # + # mSIGNA's remaining KDF is OpenSSL's EVP_BytesToKey using SHA1 and an iteration count of + # 5. The EVP_BytesToKey outer loop is unrolled with two iterations below which produces + # 320 bits (2x SHA1's output) which is > 32 bytes (what's needed for the AES-256 key) + derived_part1 = password_hashed + salt + for i in range(5): # 5 is mSIGNA's hard coded iteration count + derived_part1 = l_sha1(derived_part1).digest() + derived_part2 = derived_part1 + password_hashed + salt + for i in range(5): + derived_part2 = l_sha1(derived_part2).digest() + # + part_privkey = aes256_cbc_decrypt(derived_part1 + derived_part2[:12], part_encrypted_privkey[:16], part_encrypted_privkey[16:]) + # + # If the last block (bytes 16-31) of part_encrypted_privkey is all padding, we've found it + if part_privkey == b"\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10\x10": + return password.decode("utf_8", "replace"), count + + return False, count + + +############### Electrum ############### + +# Comman base class for all Electrum wallets +class WalletElectrum(object): + opencl_algo = -1 + + def __init__(self, loading = False): + assert loading, 'use load_from_* to create a ' + self.__class__.__name__ + aes_library_name = load_aes256_library().__name__ + self._passwords_per_second = 100000 if aes_library_name == "Crypto" else 5000 + + def __setstate__(self, state): + # (re-)load the required libraries after being unpickled + load_aes256_library(warnings=False) + self.__dict__ = state + + def passwords_per_seconds(self, seconds): + return max(int(round(self._passwords_per_second * seconds)), 1) + + # Import Electrum encrypted data extracted by an extract-electrum* script + @classmethod + def load_from_data_extract(cls, data): + assert len(data) == 32 + self = cls(loading=True) + self._iv = data[:16] # the 16-byte IV + self._part_encrypted_data = data[16:] # 16-bytes of encrypted data + return self + + def difficulty_info(self): + return "2 SHA-256 iterations" + +@register_wallet_class +class WalletElectrum1(WalletElectrum): + + def data_extract_id(): + return "el" + + @staticmethod + def is_wallet_file(wallet_file): + wallet_file.seek(0) + # returns "maybe yes" or "definitely no" + return None if wallet_file.read(2) == b"{'" else False + + # Load an Electrum wallet file (the part of it we need) + @classmethod + def load_from_filename(cls, wallet_filename): + from ast import literal_eval + with open(wallet_filename) as wallet_file: + try: + wallet = literal_eval(wallet_file.read(MAX_WALLET_FILE_SIZE)) # up to 64M, typical size is a few k + except SyntaxError as e: # translate any SyntaxError into a + raise ValueError(e) # ValueError as expected by load_wallet() + return cls._load_from_dict(wallet) + + @classmethod + def _load_from_dict(cls, wallet): + seed_version = wallet.get("seed_version") + if seed_version is None: raise ValueError("Unrecognized wallet format (Electrum1 seed_version not found)") + if seed_version != 4: raise NotImplementedError("Unsupported Electrum1 seed version " + str(seed_version)) + if not wallet.get("use_encryption"): raise RuntimeError("Electrum1 wallet is not encrypted") + seed_data = base64.b64decode(wallet["seed"]) + if len(seed_data) != 64: raise RuntimeError("Electrum1 encrypted seed plus iv is not 64 bytes long") + self = cls(loading=True) + self._iv = seed_data[:16] # only need the 16-byte IV plus + self._part_encrypted_data = seed_data[16:32] # the first 16-byte encrypted block of the seed + return self + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + assert b"0" < b"9" < b"a" < b"f" # the hex check below assumes ASCII ordering in the interest of speed + def return_verified_password_or_false(self, passwords): #Electrum1 + # Copy some vars into local for a small speed boost + l_sha256 = hashlib.sha256 + l_aes256_cbc_decrypt = aes256_cbc_decrypt + part_encrypted_seed = self._part_encrypted_data + iv = self._iv + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), passwords) + + for count, password in enumerate(passwords, 1): + key = l_sha256( l_sha256( password ).digest() ).digest() + seed = l_aes256_cbc_decrypt(key, iv, part_encrypted_seed) + # If the first 16 bytes of the encrypted seed is all lower-case hex, we've found it + for c in seed: + if type(c) == str: + c = ord(c.encode()) + + if c > ord("f") or c < ord("0") or ord("9") < c < ord("a"): break # not hex + else: # if the loop above doesn't break, it's all hex + return password.decode("utf_8", "replace"), count + + return False, count + +@register_wallet_class +class WalletElectrum2(WalletElectrum): + + def data_extract_id(): + return "e2" + + @staticmethod + def is_wallet_file(wallet_file): + wallet_file.seek(0) + # returns "maybe yes" or "definitely no" + electrumWalletFileStart = wallet_file.read(1) + return None if electrumWalletFileStart == b"{" else False + + # Load an Electrum wallet file (the part of it we need) + @classmethod + def load_from_filename(cls, wallet_filename): + import json + + with open(wallet_filename) as wallet_file: + wallet = json.load(wallet_file) + wallet_type = wallet.get("wallet_type") + if not wallet_type: + raise ValueError("Unrecognized wallet format (Electrum2 wallet_type not found)") + if wallet_type == "old": # if it's been converted from 1.x to 2.y (y<7), return a WalletElectrum1 object + return WalletElectrum1._load_from_dict(wallet) + if not wallet.get("use_encryption"): + raise ValueError("Electrum2 wallet is not encrypted") + seed_version = wallet.get("seed_version", "(not found)") + try: + if wallet.get("seed_version") < 11: # all versions above 2.x + raise NotImplementedError("Unsupported Electrum2 seed version " + str(seed_version)) + + except TypeError: # Seed version is none... Likely imported loose key wallet... + if wallet_type != "imported": + raise NotImplementedError("Unsupported Electrum2 seed version " + str(seed_version)) + else: + pass + + xprv = None + while True: # "loops" exactly once; only here so we've something to break out of + + # Electrum 2.7+ standard wallets have a keystore + keystore = wallet.get("keystore") + if keystore: + keystore_type = keystore.get("type", "(not found)") + + # Wallets originally created by an Electrum 2.x version + if keystore_type == "bip32": + xprv = keystore.get("xprv") + if xprv: break + + # Former Electrum 1.x wallet after conversion to Electrum 2.7+ standard-wallet format + elif keystore_type == "old": + seed_data = keystore.get("seed") + if seed_data: + # Construct and return a WalletElectrum1 object + seed_data = base64.b64decode(seed_data) + if len(seed_data) != 64: + raise RuntimeError("Electrum1 encrypted seed plus iv is not 64 bytes long") + self = WalletElectrum1(loading=True) + self._iv = seed_data[:16] # only need the 16-byte IV plus + self._part_encrypted_data = seed_data[16:32] # the first 16-byte encrypted block of the seed + return self + + # Imported loose private keys + elif keystore_type == "imported": + for privkey in keystore["keypairs"].values(): + if privkey: + # Construct and return a WalletElectrumLooseKey object + privkey = base64.b64decode(privkey) + if len(privkey) != 80: + raise RuntimeError("Electrum2 private key plus iv is not 80 bytes long") + self = WalletElectrumLooseKey(loading=True) + self._iv = privkey[-32:-16] # only need the 16-byte IV plus + self._part_encrypted_data = privkey[-16:] # the last 16-byte encrypted block of the key + return self + + else: + print("Warning: found unsupported keystore type " + keystore_type, file=sys.stderr) + + # Electrum 2.7+ multisig or 2fa wallet + for i in itertools.count(1): + x = wallet.get("x{}/".format(i)) + if not x: break + x_type = x.get("type", "(not found)") + if x_type == "bip32": + xprv = x.get("xprv") + if xprv: break + else: + print("Warning: found unsupported key type " + x_type, file=sys.stderr) + if xprv: break + + # Electrum 2.0 - 2.6.4 wallet with imported loose private keys + if wallet_type == "imported": + for imported in wallet["accounts"]["/x"]["imported"].values(): + privkey = imported[1] if len(imported) >= 2 else None + if privkey: + # Construct and return a WalletElectrumLooseKey object + privkey = base64.b64decode(privkey) + if len(privkey) != 80: + raise RuntimeError("Electrum2 private key plus iv is not 80 bytes long") + self = WalletElectrumLooseKey(loading=True) + self._iv = privkey[-32:-16] # only need the 16-byte IV plus + self._part_encrypted_data = privkey[-16:] # the last 16-byte encrypted block of the key + return self + + # Electrum 2.0 - 2.6.4 wallet (of any other wallet type) + else: + mpks = wallet.get("master_private_keys") + if mpks: + xprv = list(mpks.values())[0] + break + + raise RuntimeError("No master private keys or seeds found in Electrum2 wallet") + + xprv_data = base64.b64decode(xprv) + if len(xprv_data) != 128: + raise RuntimeError("Unexpected Electrum2 encrypted master private key length") + self = cls(loading=True) + self._iv = xprv_data[:16] # only need the 16-byte IV plus + self._part_encrypted_data = xprv_data[16:32] # the first 16-byte encrypted block of a master privkey + return self # (the member variable name comes from the base class) + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + assert b"1" < b"9" < b"A" < b"Z" < b"a" < b"z" # the b58 check below assumes ASCII ordering in the interest of speed + def return_verified_password_or_false(self, passwords): #Electrum2 + # Copy some vars into local for a small speed boost + l_sha256 = hashlib.sha256 + l_aes256_cbc_decrypt = aes256_cbc_decrypt + part_encrypted_xprv = self._part_encrypted_data + iv = self._iv + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), passwords) + + for count, password in enumerate(passwords, 1): + key = l_sha256( l_sha256( password ).digest() ).digest() + xprv = l_aes256_cbc_decrypt(key, iv, part_encrypted_xprv) + + if xprv.startswith(b"xprv") or xprv.startswith(b"zprv"): # BIP32 extended private key version bytes + for c in xprv[4:]: + # If it's outside of the base58 set [1-9A-HJ-NP-Za-km-z] + if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": break # not base58 + else: # if the loop above doesn't break, it's base58 + return password.decode("utf_8", "replace"), count + + return False, count + +@register_wallet_class +class WalletElectrumLooseKey(WalletElectrum): + + def data_extract_id(): + return "ek" + + @staticmethod + def is_wallet_file(wallet_file): return False # WalletElectrum2.load_from_filename() creates us + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + assert b"1" < b"9" < b"A" < b"Z" < b"a" < b"z" # the b58 check below assumes ASCII ordering in the interest of speed + def return_verified_password_or_false(self, passwords): #ElectrumLooseKey + # Copy some vars into local for a small speed boost + l_sha256 = hashlib.sha256 + l_aes256_cbc_decrypt = aes256_cbc_decrypt + encrypted_privkey_end = self._part_encrypted_data + iv = self._iv + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), passwords) + + for count, password in enumerate(passwords, 1): + key = l_sha256( l_sha256( password ).digest() ).digest() + privkey_end = l_aes256_cbc_decrypt(key, iv, encrypted_privkey_end) + padding_len = privkey_end[-1] + # Check for valid PKCS7 padding for a 52 or 51 byte "WIF" private key + # (4*16-byte-blocks == 64, 64 - 52 or 51 == 12 or 13 + if (padding_len == 12 or padding_len == 13) and privkey_end.endswith((chr(padding_len) * padding_len).encode()): + for c in privkey_end[:-padding_len]: + # If it's outside of the base58 set [1-9A-HJ-NP-Za-km-z] + if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": break # not base58 + else: # if the loop above doesn't break, it's base58 + return password.decode("utf_8", "replace"), count + + return False, count + + +@register_wallet_class +class WalletElectrum28(object): + opencl_algo = -1 + + def passwords_per_seconds(self, seconds): + return max(int(round(self._passwords_per_second * seconds)), 1) + + @staticmethod + def is_wallet_file(wallet_file): + wallet_file.seek(0) + try: + base64walletData = wallet_file.read(8) + data = base64.b64decode(base64walletData) + except: return False + return data[:4] == b"BIE1" # Electrum 2.8+ magic + + def __init__(self, loading = False): + assert loading, 'use load_from_* to create a ' + self.__class__.__name__ + global hmac, coincurve + import hmac + + try: + import coincurve + except ModuleNotFoundError: + exit("\nERROR: Cannot load coincurve module... Be sure to install all requirements with the command 'pip3 install -r requirements.txt', see https://btcrecover.readthedocs.io/en/latest/INSTALL/") + + pbkdf2_library_name = load_pbkdf2_library().__name__ + self._aes_library_name = load_aes256_library().__name__ + self._passwords_per_second = 800 if pbkdf2_library_name == "hashlib" else 140 + + def __getstate__(self): + # Serialize unpicklable coincurve.PublicKey object + state = self.__dict__.copy() + state["_ephemeral_pubkey"] = self._ephemeral_pubkey.format(compressed=False) + return state + + def __setstate__(self, state): + # Restore coincurve.PublicKey object and (re-)load the required libraries + global hmac, coincurve + import hmac + + try: + import coincurve + except ModuleNotFoundError: + exit("\nERROR: Cannot load coincurve module... Be sure to install all requirements with the command 'pip3 install -r requirements.txt', see https://btcrecover.readthedocs.io/en/latest/INSTALL/") + + load_pbkdf2_library(warnings=False) + load_aes256_library(warnings=False) + self.__dict__ = state + self._ephemeral_pubkey = coincurve.PublicKey(self._ephemeral_pubkey) + + # Load an Electrum 2.8 encrypted wallet file + @classmethod + def load_from_filename(cls, wallet_filename): + with open(wallet_filename) as wallet_file: + data = wallet_file.read(MAX_WALLET_FILE_SIZE) # up to 64M, typical size is a few k + if len(data) >= MAX_WALLET_FILE_SIZE: + raise ValueError("Encrypted Electrum wallet file is too big") + MIN_LEN = 37 + 32 + 32 # header + ciphertext + trailer + if len(data) < MIN_LEN * 4 / 3: + raise EOFError("Expected at least {} bytes of text in the Electrum wallet file".format(int(math.ceil(MIN_LEN * 4 / 3)))) + data = base64.b64decode(data) + if len(data) < MIN_LEN: + raise EOFError("Expected at least {} bytes of decoded data in the Electrum wallet file".format(MIN_LEN)) + assert data[:4] == b"BIE1", "wallet file has Electrum 2.8+ magic" + + self = cls(loading=True) + self._ephemeral_pubkey = coincurve.PublicKey(data[4:37]) + self._ciphertext_beg = data[37:37+16] # first ciphertext block + self._ciphertext_end = data[-64:-32] # last two blocks (before mac) + self._mac = data[-32:] + self._all_but_mac = data[:-32] + return self + + def difficulty_info(self): + return "1024 PBKDF2-SHA512 iterations + ECC" + + def return_verified_password_or_false(self, passwords): # Electrum28 + return self._return_verified_password_or_false_opencl(passwords) if (not isinstance(self.opencl_algo,int)) \ + else self._return_verified_password_or_false_cpu(passwords) + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def _return_verified_password_or_false_cpu(self, passwords): #Electrum28 + cutils = coincurve.utils + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), passwords) + + for count, password in enumerate(passwords, 1): + + # Derive the ECIES shared public key, and from it, the AES and HMAC keys + static_privkey = pbkdf2_hmac("sha512", password, b"", 1024, 64) + # Electrum uses a 512-bit private key (why?), but libsecp256k1 expects a 256-bit key < group's order: + static_privkey = cutils.int_to_bytes( cutils.bytes_to_int(static_privkey) % cutils.GROUP_ORDER_INT ) + shared_pubkey = self._ephemeral_pubkey.multiply(static_privkey).format() + keys = hashlib.sha512(shared_pubkey).digest() + + # Check the MAC + computed_mac = hmac.new(keys[32:], self._all_but_mac, hashlib.sha256).digest() + if computed_mac == self._mac: + return password.decode("utf_8", "replace"), count + + return False, count + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def _return_verified_password_or_false_opencl(self, arg_passwords): #Electrum28 + cutils = coincurve.utils + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + clResult = self.opencl_algo.cl_pbkdf2(self.opencl_context_pbkdf2_sha512, passwords, b"", 1024, 64) + + # This list is consumed, so recreated it and zip + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + results = zip(passwords, clResult) + + for count, (password, static_privkey) in enumerate(results, 1): + # Electrum uses a 512-bit private key (why?), but libsecp256k1 expects a 256-bit key < group's order: + static_privkey = cutils.int_to_bytes( cutils.bytes_to_int(static_privkey) % cutils.GROUP_ORDER_INT ) + shared_pubkey = self._ephemeral_pubkey.multiply(static_privkey).format() + keys = hashlib.sha512(shared_pubkey).digest() + + # Check the MAC + computed_mac = hmac.new(keys[32:], self._all_but_mac, hashlib.sha256).digest() + if computed_mac == self._mac: + return password.decode("utf_8", "replace"), count + + return False, count + + +############### Blockchain ############### + +@register_wallet_class +class WalletBlockchain(object): + + #Some of these strings are concatenated to 10 chars, as a the full string may not fit in the single decrypted block + matchStrings = b"\"guid\"|\"sharedKey\"|\"double_enc|\"dpasswordh|\"metadataHD|\"options\"|\"address_bo|\"tx_notes\"|\"tx_names\"|\"keys\"|\"hd_wallets|\"paidTo\"" + + opencl_algo = -1 + + _savepossiblematches = True + _possible_passwords_file = "possible_passwords.log" + + _dump_privkeys_file = None + _dump_wallet_file = None + _using_extract = False + + def data_extract_id(): + return "bk" + + # + # These are a bit fragile in the interest of simplicity because they assume that certain + # JSON data will be in the first block of the file + # + + # Encryption scheme used in newer wallets + def decrypt_current(self,password, salt_and_iv, iter_count, data): + key = pbkdf2_hmac("sha1", password, salt_and_iv, iter_count, 32) + decrypted = aes256_cbc_decrypt(key, salt_and_iv, data) # CBC mode + padding = ord(decrypted[-1:]) # ISO 10126 padding length + # A bit fragile because it assumes the guid is in the first encrypted block, + # although this has always been the case as of 6/2014 (since 12/2011) + # As of May 2020, guid no longer appears in the first block, but tx_notes appears there instead + return decrypted[:-padding] if 1 <= padding <= 16 and re.search(self.matchStrings, decrypted) else None + + # + # Encryption scheme only used in version 0.0 wallets (N.B. this is untested) + def decrypt_old(self, password, salt_and_iv, data): + key = pbkdf2_hmac("sha1", password, salt_and_iv, 1, 32) # only 1 iteration + decrypted = aes256_ofb_decrypt(key, salt_and_iv, data) # OFB mode + # The 16-byte last block, reversed, with all but the first byte of ISO 7816-4 padding removed: + last_block = tuple(itertools.dropwhile(lambda x: x == b"\0", decrypted[:15:-1])) + padding = 17 - len(last_block) # ISO 7816-4 padding length + return decrypted[:-padding] if 1 <= padding <= 16 and \ + decrypted[-padding] == b"\x80" and \ + re.match(self.matchStrings,decrypted.decode()) else None + + def decrypt_wallet(self,password): + from lib.cashaddress import base58 + + # Can't decrypt or dump an extract in any meaninful way... + if self._using_extract: + return + + # If we aren't dumping these files, then just return... + if not (self._dump_wallet_file or self._dump_privkeys_file): + return + + #print(self._encrypted_wallet) + + # Convert and split encrypted private key + #encrypted = base64.b64decode(self._encrypted_wallet) + iv, encrypted = self._encrypted_wallet[:16], self._encrypted_wallet[16:] + + if self._iter_count: # v2.0 wallets have a single possible encryption scheme + data = self.decrypt_current(password, iv, self._iter_count, encrypted) + else: # v0.0 wallets have three different possible encryption schemes + data = self.decrypt_current(password, iv, 10, encrypted) or \ + self.decrypt_current(password, iv, 1, encrypted) or \ + self.decrypt_old(password, iv, encrypted) + + # Load and parse the now-decrypted wallet + self._wallet_json = json.loads(data) + + # Add these items to the json for their associated address + for key in self._wallet_json['keys']: + try: + # Need to check that the private key is actually 64 characters (32 bytes) long, as some blockchain wallets + # have a bug where the base58 private keys in wallet files leave off any leading zeros... + privkey = binascii.hexlify(base58.b58decode(key["priv"])) + privkey = privkey.zfill(64) + privkey = binascii.unhexlify(privkey) + + # Some versions of blockchain wallets can be inconsistent in whether they used compressed or uncompressed addresses + # Rather than do something clever like check the addr key for to check which, just dump both for now... + key['privkey_compressed'] = base58.b58encode_check(bytes([0x80]) + privkey + bytes([0x1])) + key['privkey_uncompressed'] = base58.b58encode_check(bytes([0x80]) + privkey) + except ValueError: + print("Error: Private Key not correctly decrypted, likey due to second password being present...") + + if self._dump_wallet_file: + self.dump_wallet() + if self._dump_privkeys_file: + self.dump_privkeys() + + # This just dumps the wallet json as-is (regardless of whether the keys have been decrypted + def dump_wallet(self): + with open(self._dump_wallet_file, 'a') as logfile: + logfile.write(json.dumps(self._wallet_json, indent=4)) + + # This just dumps the wallet private keys + def dump_privkeys(self): + with open(self._dump_privkeys_file, 'a') as logfile: + logfile.write("Private Keys (For copy/paste in to Electrum) are below...\n") + + for key in self._wallet_json['keys']: + # Blockchain.com wallets are fairly inconsistent in whether they used + # compressed or uncompressed keys, so produce both... + try: + logfile.write(key['privkey_compressed'] + "\n") + logfile.write(key['privkey_uncompressed'] + "\n") + except KeyError: + print("Error: Private Key not correctly decrypted, likey due to second password being present...") + + # Older wallets don't have any hd_wallets at all, so handle this gracefully + try: + for hd_wallets in self._wallet_json['hd_wallets']: + for accounts in hd_wallets['accounts']: + logfile.write(accounts['xpriv'] + "\n") + except: + pass + + @staticmethod + def is_wallet_file(wallet_file): return None # there's no easy way to check this + + def __init__(self, iter_count, loading = False): + assert loading, 'use load_from_* to create a ' + self.__class__.__name__ + pbkdf2_library_name = load_pbkdf2_library().__name__ + aes_library_name = load_aes256_library().__name__ + self._iter_count = iter_count + self._passwords_per_second = 400000 if pbkdf2_library_name == "hashlib" else 100000 + if iter_count == 0: # if it's a v0 wallet + iter_count = 10 + self._passwords_per_second /= iter_count + if aes_library_name != "Crypto" and self._passwords_per_second > 2000: + self._passwords_per_second = 2000 + + def __setstate__(self, state): + # (re-)load the required libraries after being unpickled + load_pbkdf2_library(warnings=False) + load_aes256_library(warnings=False) + self.__dict__ = state + + def passwords_per_seconds(self, seconds): + return max(int(round(self._passwords_per_second * seconds)), 1) + + # Load a Blockchain wallet file (the part of it we need) + @classmethod + def load_from_filename(cls, wallet_filename): + with open(wallet_filename) as wallet_file: + data, iter_count = cls._parse_encrypted_blockchain_wallet(wallet_file.read(MAX_WALLET_FILE_SIZE)) # up to 64M, typical size is a few k + self = cls(iter_count, loading=True) + self._salt_and_iv = data[:16] # only need the salt_and_iv plus + self._encrypted_block = data[16:32] # the first 16-byte encrypted block + self._encrypted_wallet = data + return self + + # Parse the contents of an encrypted blockchain wallet (v0 - v3) or config file returning two + # values in a tuple: (encrypted_data_blob, iter_count) where iter_count == 0 for v0 wallets + @staticmethod + def _parse_encrypted_blockchain_wallet(data): + iter_count = 0 + + while True: # "loops" exactly once; only here so we've something to break out of + # Most blockchain files (except v0.0 wallets) are JSON encoded; try to parse it as such + try: + data = json.loads(data) + except ValueError: break + + # Config files have no version attribute; they encapsulate the wallet file plus some detrius + if "version" not in data: + try: + data = data["payload"] # extract the wallet file from the config + except KeyError: + raise ValueError("Can't find either version nor payload attributes in Blockchain file") + try: + data = json.loads(data) # try again to parse a v2.0/v3.0 JSON-encoded wallet file + except ValueError: break + + # Extract what's needed from a v2.0/3.0 wallet file + if data["version"] > 4: + raise NotImplementedError("Unsupported Blockchain wallet version " + str(data["version"])) + iter_count = data["pbkdf2_iterations"] + if not isinstance(iter_count, int) or iter_count < 1: + raise ValueError("Invalid Blockchain pbkdf2_iterations " + str(iter_count)) + data = data["payload"] + + break + + # Either the encrypted data was extracted from the "payload" field above, or + # this is a v0.0 wallet file whose entire contents consist of the encrypted data + try: + data = base64.b64decode(data) + except TypeError as e: + raise ValueError("Can't base64-decode Blockchain wallet: "+str(e)) + if len(data) < 32: + raise ValueError("Encrypted Blockchain data is too short") + #Used to check if the length of the decrypted data was divisible by 16, but this wasn't actually true for all v0 wallets + #if len(data) % 16 != 0: + # raise ValueError("Encrypted Blockchain data length is not divisible by the encryption blocksize (16)") + + # If this is (possibly) a v0.0 (a.k.a. v1) wallet file, check that the encrypted data + # looks random, otherwise this could be some other type of base64-encoded file such + # as a MultiBit key file (it should be safe to skip this test for v2.0+ wallets) + if not iter_count: # if this is a v0.0 wallet + # The likelihood of of finding a valid encrypted blockchain wallet (even at its minimum length + # of about 500 bytes) with less than 7.4 bits of entropy per byte is less than 1 in 10^6 + # (decreased test below to 7.0 after being shown a wallet with 7.0 entropy bits) + entropy_bits = est_entropy_bits(data) + if entropy_bits < 7.0: + raise ValueError("Doesn't look random enough to be an encrypted Blockchain wallet (only {:.1f} bits of entropy per byte)".format(entropy_bits)) + + return data, iter_count # iter_count == 0 for v0 wallets + + # Import extracted Blockchain file data necessary for main password checking + @classmethod + def load_from_data_extract(cls, file_data): + # These are the same first encrypted block, salt_and_iv, iteration count retrieved above + encrypted_block, salt_and_iv, iter_count = struct.unpack(b"< 16s 16s I", file_data) + self = cls(iter_count, loading=True) + self._encrypted_block = encrypted_block + self._salt_and_iv = salt_and_iv + self._using_extract = True + return self + + def difficulty_info(self): + return "{:,} PBKDF2-SHA1 iterations".format(self._iter_count or 10) + + def init_logfile(self): + with open(self._possible_passwords_file, 'a') as logfile: + logfile.write( + "\n\n" + + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " New Recovery Started...\n" + + "This file contains passwords and blocks from passwords which `may` not exactly match those that " + "BTCRecover searches for by default. \n\n" + "Examples of successfully decrypted blocks will not just be random characters, " + "some examples of what correctly decryped blocks logs look like are:\n\n" + "Possible Password ==>btcr-test-password<== in Decrypted Block ==>{\n\"guid\" : \"9bb<==\n" + "Possible Password ==>testblockchain<== in Decrypted Block ==>{\"address_book\":<==\n" + "Possible Password ==>btcr-test-password<== in Decrypted Block ==>{\"tx_notes\":{},\"\n" + "Possible Password ==>Testing123!<== in Decrypted Block ==>{\"double_encrypt<==\n" + "\n" + "Note: The markers ==> and <== are not part of either your password or the decrypted block...\n\n" + "If the password works and was not correctly found, or your wallet detects a false positive, please report the decrypted block data at " + "https://github.com/3rdIteration/btcrecover/issues/\n\n") + print("* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *") + print("* Note for Blockchain.com Wallets... *") + print("* *") + print("* Writing all `possibly matched` and fully matched Passwords & *") + print("* Decrypted blocks to ", self._possible_passwords_file) + print("* This can be disabled with the --disablesavepossiblematches argument *") + print("* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *") + print() + + # A bit fragile because it assumes that some specific text is in the first encrypted block, + # This was "guid" as of 6/2014 (since 12/2011) + # As of May 2020, guid no longer appears in the first block, but 'tx_notes' appears there instead + # Also check to see if the first block starts with 'address_book' + # first as was apparently the case with some wallets created around Jan 2014 + # (see https://github.com/gurnec/btcrecover/issues/ that start with "double_encryption" + # as per this issue here: https://github.com/3rdIteration/btcrecover/issues/96 + def check_blockchain_decrypted_block(self, unencrypted_block, password): + # Return True if + if re.search(self.matchStrings, unencrypted_block): + if self._savepossiblematches: + try: + return True # Only return true if we can successfully decode the block in to ascii + + except UnicodeDecodeError: # Likely a false positive if we can't... + with open('possible_passwords.log', 'a', encoding="utf_8") as logfile: + logfile.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + " Found Likely False Positive Password (with non-Ascii characters in decrypted block) ==>" + + password.decode("utf_8") + + "<== in Decrypted Block ==>" + + unencrypted_block.decode("utf-8", "ignore") + + "<==\n") + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "**NOTICE** Possible (Unlikely) Match Found, recorded in possible_passwords.log") + + elif unencrypted_block[0] == ord("{"): + if b'"' in unencrypted_block[:4]: # If it really is a json wallet fragment, there will be a double quote in there within the first few characters... + try: + # Try to decode the decrypted block to ascii, this will pretty much always fail on anything other + # than the correct password + unencrypted_block.decode("ascii") + if self._savepossiblematches: + with open(self._possible_passwords_file, 'a', encoding="utf_8") as logfile: + logfile.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + " Possible Password ==>" + + password.decode("utf_8") + + "<== in Decrypted Block ==>" + + unencrypted_block.decode("ascii") + + "<==\n") + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "**NOTICE** Possible (Unlikely) Match Found, recorded in possible_passwords.log") + except UnicodeDecodeError: + pass + + + + return False + + def return_verified_password_or_false(self, passwords): # Blockchain.com Main Password + return self._return_verified_password_or_false_opencl(passwords) if (not isinstance(self.opencl_algo,int)) \ + else self._return_verified_password_or_false_cpu(passwords) + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def _return_verified_password_or_false_cpu(self, arg_passwords): # Blockchain.com Main Password + # Copy a few globals into local for a small speed boost + l_pbkdf2_hmac = pbkdf2_hmac + l_aes256_cbc_decrypt = aes256_cbc_decrypt + l_aes256_ofb_decrypt = aes256_ofb_decrypt + encrypted_block = self._encrypted_block + salt_and_iv = self._salt_and_iv + iter_count = self._iter_count + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + v0 = not iter_count # version 0.0 wallets don't specify an iter_count + if v0: iter_count = 10 # the default iter_count for version 0.0 wallets + + for count, password in enumerate(passwords, 1): + key = l_pbkdf2_hmac("sha1", password, salt_and_iv, iter_count, 32) # iter_count iterations + unencrypted_block = l_aes256_cbc_decrypt(key, salt_and_iv, encrypted_block) # CBC mode + + if self.check_blockchain_decrypted_block(unencrypted_block, password): + # Decrypt and dump the wallet if required + self.decrypt_wallet(password) + return password.decode("utf_8", "replace"), count + + if v0: + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + # Try the older encryption schemes possibly used in v0.0 wallets + for count, password in enumerate(passwords, 1): + key = l_pbkdf2_hmac("sha1", password, salt_and_iv, 1, 32) # only 1 iteration + unencrypted_block = l_aes256_cbc_decrypt(key, salt_and_iv, encrypted_block) # CBC mode + # print("CBC:", unencrypted_block) + if self.check_blockchain_decrypted_block(unencrypted_block, password): + return password.decode("utf_8", "replace"), count + + unencrypted_block = l_aes256_ofb_decrypt(key, salt_and_iv, encrypted_block) # OFB mode + # print("OBF:", unencrypted_block) + if self.check_blockchain_decrypted_block(unencrypted_block, password): + return password.decode("utf_8", "replace"), count + + return False, count + + def _return_verified_password_or_false_opencl(self, arg_passwords): # Blockchain.com Main Password + # Copy a few globals into local for a small speed boost + l_aes256_cbc_decrypt = aes256_cbc_decrypt + l_aes256_ofb_decrypt = aes256_ofb_decrypt + encrypted_block = self._encrypted_block + salt_and_iv = self._salt_and_iv + iter_count = self._iter_count + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + clResult = self.opencl_algo.cl_pbkdf2(self.opencl_context_pbkdf2_sha1, passwords, salt_and_iv, iter_count, 32) + + #This list is consumed, so recreated it and zip + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + results = zip(passwords, clResult) + + for count, (password,key) in enumerate(results, 1): + unencrypted_block = l_aes256_cbc_decrypt(key, salt_and_iv, encrypted_block) # CBC mode + if self.check_blockchain_decrypted_block(unencrypted_block, password): + return password.decode("utf_8", "replace"), count + + return False, count + + +@register_wallet_class +class WalletBlockchainSecondpass(WalletBlockchain): + + _dump_privkeys_file = None + _dump_wallet_file = None + _using_extract = False + + def data_extract_id(): + return "bs" + + def decrypt_secondpass_privkey(self, encrypted, password, iterations, legacy_decrypt): + # Convert and split encrypted private key + encrypted = base64.b64decode(encrypted) + iv, encrypted = encrypted[:16], encrypted[16:] + + # Create the decryption key and decrypt the private key + aeshash = pbkdf2_hmac("sha1", password, iv, iterations, 32) + if not legacy_decrypt: + clear = aes256_cbc_decrypt(aeshash, iv, encrypted) + else: + clear = aes256_ofb_decrypt(aeshash, iv, encrypted) + + # Remove ISO 10126 Padding + pad_len = clear[-1] + decrypted = clear[:-pad_len] + return decrypted + + def decrypt_wallet(self, password, iter_count, legacy_decrypt = False): + from lib.cashaddress import base58 + + # Can't decrypt or dump an extract in any meaninful way... + if self._using_extract: + return + + # If we aren't dumping these files, then just return... + if not (self._dump_wallet_file or self._dump_privkeys_file): + return + + # Decrypt the keys and add these items to the json for their associated address + for key in self._wallet_json['keys']: + privkey = self.decrypt_secondpass_privkey(key["priv"], + self._wallet_json['sharedKey'].encode('ascii') + password, + iter_count, legacy_decrypt) + + key['priv_decrypted'] = base58.b58encode(base58.b58decode(privkey)) + + # Need to check that the private key is actually 64 characters (32 bytes) long, as some blockchain wallets + # have a bug where the base58 private keys in wallet files leave off any leading zeros... + privkey = binascii.hexlify(base58.b58decode(privkey)) + privkey = privkey.zfill(64) + privkey = binascii.unhexlify(privkey) + + # Some versions of blockchain wallets can be inconsistent in whether they used compressed or uncompressed addresses + # Rather than do something clever like check the addr key for to check which, just dump both for now... + key['privkey_compressed'] = base58.b58encode_check(bytes([0x80]) + privkey + bytes([0x1])) + key['privkey_uncompressed'] = base58.b58encode_check(bytes([0x80]) + privkey) + + # Older wallets don't have any hd_wallets at all, so handle this gracefully + try: + for hd_wallets in self._wallet_json['hd_wallets']: + for accounts in hd_wallets['accounts']: + accounts['xpriv_decrypted'] = self.decrypt_secondpass_privkey(accounts["xpriv"], + self._wallet_json['sharedKey'].encode('ascii') + password, + iter_count, legacy_decrypt).decode() + except: + pass + + if self._dump_wallet_file: + self.dump_wallet() + + if self._dump_privkeys_file: + self.dump_privkeys() + + # This just dumps the wallet json as-is (regardless of whether the keys have been decrypted + def dump_wallet(self): + with open(self._dump_wallet_file, 'a') as logfile: + logfile.write(json.dumps(self._wallet_json, indent=4)) + + # This just dumps the wallet private keys + def dump_privkeys(self): + with open(self._dump_privkeys_file, 'a') as logfile: + logfile.write("Private Keys (For copy/paste in to Electrum) are below...\n") + + for key in self._wallet_json['keys']: + # Blockchain.com wallets are fairly inconsistent in whether they used + # compressed or uncompressed keys, so produce both... + logfile.write(key['privkey_compressed'] + "\n") + logfile.write(key['privkey_uncompressed'] + "\n") + + try: + for hd_wallets in self._wallet_json['hd_wallets']: + for accounts in hd_wallets['accounts']: + logfile.write(accounts['xpriv_decrypted']+ "\n") + except: + pass + + + @staticmethod + def is_wallet_file(wallet_file): return False # never auto-detected as this wallet type + + # Load a Blockchain wallet file to get the "Second Password" hash, + # decrypting the wallet if necessary + @classmethod + def load_from_filename(cls, wallet_filename, password = None, force_purepython = False): + from uuid import UUID + + with open(wallet_filename) as wallet_file: + data = wallet_file.read(MAX_WALLET_FILE_SIZE) # up to 64M, typical size is a few k + + try: + # Assuming the wallet is encrypted, get the encrypted data + data, iter_count = cls._parse_encrypted_blockchain_wallet(data) + except ValueError as e: + # This is the one error to expect and ignore which occurs when the wallet isn't encrypted + if e.args[0] == "Can't find either version nor payload attributes in Blockchain file": + pass + else: + raise + except Exception as e: + error_exit(str(e)) + else: + # If there were no problems getting the encrypted data, decrypt it + if not password: + password = prompt_unicode_password( + "Please enter the Blockchain wallet's main password: ", + "encrypted Blockchain files must be decrypted before searching for the second password") + password = password.encode("utf_8") + data, salt_and_iv = data[16:], data[:16] + load_pbkdf2_library(force_purepython) + load_aes256_library(force_purepython) + + if iter_count: # v2.0 wallets have a single possible encryption scheme + data = cls.decrypt_current(cls, password, salt_and_iv, iter_count, data) + else: # v0.0 wallets have three different possible encryption schemes + data = cls.decrypt_current(cls, password, salt_and_iv, 10, data) or \ + cls.decrypt_current(cls, password, salt_and_iv, 1, data) or \ + cls.decrypt_old(cls, password, salt_and_iv, data) + if not data: + error_exit("can't decrypt wallet (wrong main password?)") + + # Load and parse the now-decrypted wallet + data = json.loads(data) + if not data.get("double_encryption"): + error_exit("double encryption with a second password is not enabled for this wallet") + + # Extract and save what we need to perform checking on the second password + try: + iter_count = data["options"]["pbkdf2_iterations"] + if not isinstance(iter_count, int) or iter_count < 1: + raise ValueError("Invalid Blockchain second password pbkdf2_iterations " + str(iter_count)) + except KeyError: + iter_count = 0 + self = cls(iter_count, loading=True) + # + self._password_hash = base64.b16decode(data["dpasswordhash"], casefold=True) + if len(self._password_hash) != 32: + raise ValueError("Blockchain second password hash is not 32 bytes long") + # + + self._salt = data["sharedKey"].encode("ascii") + + if str(UUID(self._salt.decode().replace("-",""))).encode() != self._salt: + raise ValueError("Unrecognized Blockchain salt format") + + self._wallet_json = data + + return self + + # Import extracted Blockchain file data necessary for second password checking + @classmethod + def load_from_data_extract(cls, file_data): + from uuid import UUID + # These are the same second password hash, salt, iteration count retrieved above + password_hash, uuid_salt, iter_count = struct.unpack(b"< 32s 16s I", file_data) + self = cls(iter_count, loading=True) + self._salt = str(UUID(bytes=uuid_salt)) + self._password_hash = password_hash + self._using_extract = True + return self + + def difficulty_info(self): + return ("{:,}".format(self._iter_count) if self._iter_count else "1-10") + " SHA-256 iterations" + + def return_verified_password_or_false(self, passwords): # Blockchain.com second Password + return self._return_verified_password_or_false_opencl(passwords) if (not isinstance(self.opencl_algo,int)) \ + else self._return_verified_password_or_false_cpu(passwords) + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def _return_verified_password_or_false_cpu(self, arg_passwords): # Blockchain.com Secondpassword + # Copy vars into locals for a small speed boost + l_sha256 = hashlib.sha256 + password_hash = self._password_hash + salt = self._salt + iter_count = self._iter_count + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + for count, password in enumerate(passwords, 1): + + # Newer wallets specify an iter_count and use something similar to PBKDF1 with SHA-256 + if iter_count: + if isinstance(salt,str): running_hash = salt.encode() + password + if isinstance(salt,bytes): running_hash = salt + password + for i in range(iter_count): + running_hash = l_sha256(running_hash).digest() + if running_hash == password_hash: + #print("Debug: Matched Second pass (Iter-Count present)") + # Decrypt wallet and dump if required + self.decrypt_wallet(password, iter_count) + return password.decode("utf_8", "replace"), count + + # Older wallets used one of three password hashing schemes + # 2022-03 Update - It also seems that some newer (v3) wallets use these older hashing schemes too... + if isinstance(salt,str): running_hash = l_sha256(salt.encode() + password).digest() + if isinstance(salt, bytes): running_hash = l_sha256(salt + password).digest() + # Just a single SHA-256 hash + if running_hash == password_hash: + #print("Debug: Matched Second pass (Single Hash)") + # Decrypt wallet and dump if required + self.decrypt_wallet(password, 1) + return password.decode("utf_8", "replace"), count + # Exactly 10 hashes (the first of which was done above) + for i in range(9): + running_hash = l_sha256(running_hash).digest() + if running_hash == password_hash: + #print("Debug: Matched Second pass (Exactly 10 hashes)") + # Decrypt wallet and dump if required + self.decrypt_wallet(password, 10) + return password.decode("utf_8", "replace"), count + # A single unsalted hash + if l_sha256(password).digest() == password_hash: + #print("Debug: Matched Second pass (Single Unsalted Hash)") + # Decrypt wallet and dump if required + self.decrypt_wallet(password, 1, True) + return password.decode("utf_8", "replace"), count + + return False, count + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def _return_verified_password_or_false_opencl(self, arg_passwords): # Blockchain.com Secondpassword + # Copy vars into locals for a small speed boost + l_sha256 = hashlib.sha256 + password_hash = self._password_hash + salt = self._salt + iter_count = self._iter_count + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + hashed_keys = [] + for password in passwords: + if isinstance(salt, str): derived_key = salt.encode() + password + if isinstance(salt, bytes): derived_key = salt + password + hashed_keys.append(l_sha256(derived_key).digest()) + + if iter_count: + clResult = self.opencl_algo.cl_hash_iterations(self.opencl_context_hash_iterations_sha256, hashed_keys, self._iter_count-1, 8) + + # This list is consumed, so recreated it and zip + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + results = zip(passwords, clResult) + + # Newer wallets specify an iter_count and use something similar to PBKDF1 with SHA-256 + for count, (password, derived_key) in enumerate(results, 1): + if derived_key == password_hash: + self.decrypt_wallet(password, iter_count) + return password.decode("utf_8", "replace"), count + + # Older wallets used one of three password hashing schemes + # 2022-03 Update - It also seems that some newer (v3) wallets use these older hashing schemes too... + # (These older encryption schemes aren't worth running on the GPU, too few iterations) + + # Convert Unicode strings (lazily) to UTF-8 bytestrings + passwords = map(lambda p: p.encode("utf_8", "ignore"), arg_passwords) + + for count, password in enumerate(passwords, 1): + if isinstance(salt, str): running_hash = l_sha256(salt.encode() + password).digest() + if isinstance(salt, bytes): running_hash = l_sha256(salt + password).digest() + # Just a single SHA-256 hash + if running_hash == password_hash: + # print("Debug: Matched Second pass (Single Hash)") + # Decrypt wallet and dump if required + self.decrypt_wallet(password, 1) + return password.decode("utf_8", "replace"), count + # Exactly 10 hashes (the first of which was done above) + for i in range(9): + running_hash = l_sha256(running_hash).digest() + if running_hash == password_hash: + # print("Debug: Matched Second pass (Exactly 10 hashes)") + # Decrypt wallet and dump if required + self.decrypt_wallet(password, 10) + return password.decode("utf_8", "replace"), count + # A single unsalted hash + if l_sha256(password).digest() == password_hash: + # print("Debug: Matched Second pass (Single Unsalted Hash)") + # Decrypt wallet and dump if required + self.decrypt_wallet(password, 1, True) + return password.decode("utf_8", "replace"), count + + + return False, count + +############### Block.io ############### + +@register_wallet_class +class WalletBlockIO(object): + opencl_algo = -1 + _savepossiblematches = False + + _dump_privkeys_file = None + _dump_wallet_file = None + _using_extract = False + + def __init__(self): + try: + import ecdsa + except ModuleNotFoundError: + exit( + "\nERROR: Cannot load ecdsa module which is required for block.io wallets... You can install it with the command 'pip3 install ecdsa") + + try: + import bitcoinutils + except ModuleNotFoundError: + exit( + "\nERROR: Cannot load bitcoin-utils module which is required for block.io wallets... You can install it with the command 'pip3 install bitcoin-utils'") + + @staticmethod + def is_wallet_file(wallet_file): + wallet_file.seek(0) + try: + walletdata = wallet_file.read() + except: return False + return (b"user_key" in walletdata and b"encrypted_passphrase" in walletdata) # Block.io wallets have a user_key field and emcrypted passphrase fields which are quite unique + + def passwords_per_seconds(self, seconds): + try: + if self.user_key['algorithm']['pbkdf2_iterations'] == 2048: + return 5000 + else: + return 150 # Newer wallets use over 100,000 PBKDF2 iterations + except KeyError: # Older Legacy wallets don't have a algorithm key at all... + return 5000 + + # Load a Dogechain wallet file + @classmethod + def load_from_filename(cls, wallet_filename): + self = cls() + with open(wallet_filename, "rb") as wallet_file: + wallet_data = wallet_file.read() + + json_data = json.loads(wallet_data) + # There are two ways that the JSON from block.io can be formatted, depending on which backup the user retrieves + try: + self.user_key = json_data['data']['current_user_keys'][0]['user_key'] + except KeyError: + self.user_key = json_data['data']['user_key'] + return self + + def difficulty_info(self): + try: + iter_count = self.user_key['algorithm']['pbkdf2_iterations'] + hash_function = self.user_key['algorithm']['pbkdf2_hash_function'] + except KeyError: + iter_count = 2048 + hash_function = "SHA256" + return str(iter_count) + " " + hash_function + " Iterations" + + # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a password + # is correct return it, else return False for item 0; return a count of passwords checked for item 1 + def return_verified_password_or_false(self, arg_passwords): # block.io Main Password + + for count, password in enumerate(arg_passwords, 1): + try: + key = lib.block_io.BlockIo.Helper.dynamicExtractKey(self.user_key, password) + if self.user_key['public_key'].encode() == key.pubkey_hex(): + return password, count + + except lib.block_io.IncorrectDecryptionPasswordError: + pass + except binascii.Error: + pass + + return False, count + +# btcrpass.py -- btcrecover main library +# Copyright (C) 2014-2017 Christopher Gurnee +# 2020 Jefferson Nunn and Gaith +# 2019-2021 Stephen Rothery +# +# This file is part of btcrecover. +# +# btcrecover is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version +# 2 of the License, or (at your option) any later version. +# +# btcrecover is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see http://www.gnu.org/licenses/ + + +# TODO: put everything in a class? +# TODO: pythonize comments/documentation + +__version__ = "1.13.0-Cryptoguide" +__ordering_version__ = b"0.6.4" # must be updated whenever password ordering changes +disable_security_warnings = True +progress_bar_widgets_global = None # Will store custom progress bar widgets # Import modules included in standard libraries import sys, argparse, itertools, string, re, multiprocessing, signal, os, pickle, gc, \ @@ -3822,9 +6588,7 @@ def _return_verified_password_or_false_opencl(self, arg_passwords): # BIP38 Encr passwords = map(lambda p: normalize("NFC", p).encode("utf_8", "ignore"), arg_passwords) results = zip(passwords, clResult) for count, (password, scrypthash) in enumerate(results, 1): - self.decrypted_privkey = bip38decrypt_non_ec(scrypthash, self.enc_privkey, self.has_compression_flag, self.has_lotsequence_flag, network_prefix = self.network.prefix_address) - if self.decrypted_privkey: - print("Decrypted BIP38 Key:", self.decrypted_privkey) + if bip38decrypt_non_ec(scrypthash, self.enc_privkey, self.has_compression_flag, self.has_lotsequence_flag, network_prefix = self.network.prefix_address): return password.decode("utf_8", "replace"), count else: clPrefactors = self.opencl_algo.cl_scrypt(self.opencl_context_scrypt, passwords, 14, 3, 3, 32, self.salt) @@ -3833,9 +6597,7 @@ def _return_verified_password_or_false_opencl(self, arg_passwords): # BIP38 Encr passwords = map(lambda p: normalize("NFC", p).encode("utf_8", "ignore"), arg_passwords) results = zip(passwords, clPrefactors, encseedbs) for count, (password, prefactor, encseedb) in enumerate(results, 1): - self.decrypted_privkey = bip38decrypt_ec(prefactor, encseedb, self.enc_privkey, self.has_compression_flag, self.has_lotsequence_flag, network_prefix = self.network.prefix_address) - if self.decrypted_privkey: - print("Decrypted BIP38 Key:", self.decrypted_privkey) + if bip38decrypt_ec(prefactor, encseedb, self.enc_privkey, self.has_compression_flag, self.has_lotsequence_flag, network_prefix = self.network.prefix_address): return password.decode("utf_8", "replace"), count return False, count @@ -3849,17 +6611,14 @@ def _return_verified_password_or_false_cpu(self, passwords): # BIP38 Encrypted P for count, password in enumerate(passwords, 1): if not self.ec_multiplied: scrypthash = l_scrypt(password, self.salt, 1 << 14, 8, 8, 64) - self.decrypted_privkey = bip38decrypt_non_ec(scrypthash, self.enc_privkey, self.has_compression_flag, self.has_lotsequence_flag, network_prefix = self.network.prefix_address) - if self.decrypted_privkey: - print("Decrypted BIP38 Key:", self.decrypted_privkey) + if bip38decrypt_non_ec(scrypthash, self.enc_privkey, self.has_compression_flag, self.has_lotsequence_flag, network_prefix = self.network.prefix_address): return password.decode("utf_8", "replace"), count else: prefactor = l_scrypt(password, self.salt, 1 << 14, 8, 8, 32) passpoint = prefactor_to_passpoint(prefactor, self.has_lotsequence_flag, self.enc_privkey) encseedb = l_scrypt(passpoint, self.enc_privkey[0:12], 1024, 1, 1, 64) - self.decrypted_privkey = bip38decrypt_ec(prefactor, encseedb, self.enc_privkey, self.has_compression_flag, self.has_lotsequence_flag, network_prefix = self.network.prefix_address) - if self.decrypted_privkey: - print("Decrypted BIP38 Key:", self.decrypted_privkey) + + if bip38decrypt_ec(prefactor, encseedb, self.enc_privkey, self.has_compression_flag, self.has_lotsequence_flag, network_prefix = self.network.prefix_address): return password.decode("utf_8", "replace"), count return False, count @@ -4868,7 +7627,7 @@ class WalletRawPrivateKey(object): ]) def __init__(self, addresses = None, addressdb = None, check_compressed = True, check_uncompressed = True, - force_check_p2sh = False, crypto = 'bitcoin', is_performance = False, correct_wallet_password = None): + force_check_p2sh = False, crypto = 'bitcoin', is_performance = False): global hmac, coincurve, base58 if not hashlib_ripemd160_available: print("Warning: Native RIPEMD160 not available via Hashlib, using Pure-Python (This will significantly reduce performance)") @@ -4902,8 +7661,6 @@ def __init__(self, addresses = None, addressdb = None, check_compressed = True, input_address_standard = False self.address_type_checks = [] - self.hash160s = None - if addresses: if self.crypto == 'ethereum': self.hash160s = btcrseed.WalletEthereum._addresses_to_hash160s(addresses) @@ -4916,8 +7673,14 @@ def __init__(self, addresses = None, addressdb = None, check_compressed = True, input_address_p2sh = True else: input_address_standard = True + else: + print("No Addresses Provided ... ") + print("Loading address database ...") + + if not addressdb: + print("No AddressDB specified, trying addresses.db") + addressdb = "addresses.db" - if addressdb: self.hash160s = AddressSet.fromfile(open(addressdb, "rb")) print("Loaded", len(self.hash160s), "addresses from database ...") input_address_p2sh = True @@ -4926,8 +7689,6 @@ def __init__(self, addresses = None, addressdb = None, check_compressed = True, if input_address_p2sh or force_check_p2sh: self.address_type_checks.append(True) if input_address_standard and not (force_check_p2sh): self.address_type_checks.append(False) - self.correct_wallet_password = correct_wallet_password - def __setstate__(self, state): # (re-)load the required libraries after being unpickled global hmac, coincurve, base58, pylibscrypt @@ -4972,51 +7733,17 @@ def return_verified_password_or_false(self, passwords): # Raw Privatekey elif len(password) == 52 and password[0] in ["L","K"]: #Compressed Private Key try: - # Check whether we have a valid Base58check first (This will weed out most invalid options) - base58.b58decode_check(password) - print("***NOTICE*** Found possible Private key (Has valid Base58Checksum):", password, "checking if supplied address matches...") - - # Convert to Hex for checking against supplied address password = binascii.hexlify(base58.b58decode_check(password)[1:-1]) WIFPrivKey = True - except: continue elif len(password) == 51 and password[0] == "5": # Uncompressed Private Key try: - # Check whether we have a valid Base58check first (This will weed out most invalid options) - base58.b58decode_check(password) - print("***NOTICE*** Found possible Private key (Has valid Base58Checksum):", password, "checking if supplied address matches...") - - # Convert to Hex for checking against supplied address password = binascii.hexlify(base58.b58decode_check(password)[1:]) WIFPrivKey = True except: continue - elif len(password) == 58 and password[0:3] == "6Pn": # BIP38 Encrypted Private key - try: - # Check whether we have a valid Base58check first (This will weed out most invalid options) - base58.b58decode_check(password) - print("***NOTICE*** Found possible BIP38 Private key (Has valid Base58Checksum):", password, "checking if supplied address matches...") - - except: - continue - - if self.correct_wallet_password: - print("Attempting BIP38 decryption...") - test_wallet = WalletBIP38(enc_privkey = password) - correct_password, count = test_wallet.return_verified_password_or_false([self.correct_wallet_password]) - if correct_password: - password = binascii.hexlify(base58.b58decode_check(test_wallet.decrypted_privkey)[1:-1]) - else: - print("Incorrect decryption password supplied, unable to check further...") - continue - - WIFPrivKey = True - else: - print("No decryption password supplied, unable to check further...") - continue else: # Unsupported Private Key continue @@ -5033,15 +7760,6 @@ def return_verified_password_or_false(self, passwords): # Raw Privatekey print(message) continue - # Don't spam this at performance measurement step - if password != "9cf68de3a8bec8f4649a5a1eb9340886a68a85c0c3ae722393ef3dd7a6c4da58": - if not self.hash160s: - if WIFPrivKey: - print("Warning: No addresses supplied, unable to check Base58 private key any further.. ") - else: - print("Warning: No addresses supplied combined with hexidicimal private key, this will never find a result... ") - - # Convert the private keys to public keys and addresses for verification. for isCompressed in self.compression_checks: @@ -5940,7 +8658,7 @@ def clean_autosave_args(argList, listName): # # TODO: document kwds usage (as used by unit tests) def parse_arguments(effective_argv, wallet = None, base_iterator = None, - perf_iterator = None, inserted_items = None, check_only = None, disable_security_warning_param = False, **kwds): + perf_iterator = None, inserted_items = None, check_only = None, disable_security_warning_param = False, progress_bar_widgets = None, **kwds): # effective_argv is what we are effectively given, either via the command line, via embedded # options in the tokenlist file, or as a result of restoring a session, before any argument # processing or defaulting is done (unless it's is done by argparse). Each time effective_argv @@ -6539,8 +9257,7 @@ def parse_arguments(effective_argv, wallet = None, base_iterator = None, check_compressed = not(args.skip_compressed), check_uncompressed = not(args.skip_uncompressed), force_check_p2sh = args.force_check_p2sh, - crypto=args.wallet_type, - correct_wallet_password = args.correct_wallet_password) + crypto=args.wallet_type) # Set the default number of threads to use. For GPU processing, things like hyperthreading are unhelpful, so use physical cores only... if not args.threads: @@ -9086,26 +11803,31 @@ def windows_ctrl_handler(signal): print("Using", args.threads, "worker", "threads" if args.threads > 1 else "thread") # (they're actually worker processes) if have_progress: + if args.no_eta: + progress = progressbar.ProgressBar(maxval=progressbar.UnknownLength, poll=0.1, widgets=[ progressbar.AnimatedMarker(), progressbar.FormatLabel(" %(value)d elapsed: %(elapsed)s rate: "), progressbar.FileTransferSpeed(unit="P") ]) - progress.update_interval = sys.maxsize # work around performance bug in ProgressBar else: if args.dynamic_passwords_count: try: passwords_count = passwords_count_generator.__next__() - except StopIteration: passwords_count = 0 - progress = progressbar.ProgressBar(maxval=passwords_count, poll=0.1, widgets=[ + widgets = progress_bar_widgets_global if progress_bar_widgets_global else [ progressbar.SimpleProgress(), " ", progressbar.Bar(left="[", fill="-", right="]"), - progressbar.FormatLabel(" %(elapsed)s, "), + " ", + progressbar.FileTransferSpeed(unit="P"), # This will auto-scale to kP/s + " ", + progressbar.FormatLabel("elapsed: %(elapsed)s, "), progressbar.ETA() - ]) + ] + progress = progressbar.ProgressBar(maxval=passwords_count, poll=0.1, widgets=widgets) + else: progress = None if args.dynamic_passwords_count: @@ -9211,6 +11933,7 @@ def windows_ctrl_handler(signal): progress.maxval = current_passwords_count.value if passwords_counting_result.ready() and not passwords_counting_result.successful(): passwords_counting_result.get() + progress.update(passwords_tried) if l_savestate and passwords_tried % est_passwords_per_5min == 0: do_autosave(args.skip + passwords_tried) diff --git a/btcrecover/btcrseed.py b/btcrecover/btcrseed.py index ce7a4922..2a1a0c5e 100644 --- a/btcrecover/btcrseed.py +++ b/btcrecover/btcrseed.py @@ -43,6 +43,7 @@ import lib.cardano.cardano_utils as cardano import lib.stacks.c32 as c32 from lib.p2tr_helper import P2TR_tools +from lib import progressbar # Enable functions that may not work for some standard libraries in some environments hashlib_ripemd160_available = False @@ -107,6 +108,7 @@ def ripemd160(msg): except: pass +from math import factorial _T = TypeVar("_T") @@ -1533,7 +1535,7 @@ def config_mnemonic(self, mnemonic_guess = None, lang = None, passphrases = [u"" # Specifically, update self._words and the globals mnemonic_ids_guess and close_mnemonic_ids. if self._lang.endswith(self.FIRSTFOUR_TAG): long_lang_words = self._language_words[self._lang[:-len(self.FIRSTFOUR_TAG)]] - assert isinstance(long_lang_words[0], str), "long words haven't yet been converted into bytes" + assert isinstance(long_lang_words[0], str), "long words haven't yet been converted into bytes" assert isinstance(self._words[0], str), "short words have already been converted into bytes" assert len(long_lang_words) == len(self._words), "long and short word lists have the same length" long_lang_words = [ self._unicode_to_bytes(l) for l in long_lang_words ] @@ -1543,7 +1545,7 @@ def config_mnemonic(self, mnemonic_guess = None, lang = None, passphrases = [u"" global mnemonic_ids_guess # the to-be-replaced short-words guess long_ids_guess = () # the new long-words guess for short_id in mnemonic_ids_guess: - long_ids_guess += None if short_id is None else short_to_long[short_id], + long_ids_guess += None if short_id is None else short_to_long[short_id], # *now* convert to BIP39's format mnemonic_ids_guess = long_ids_guess # global close_mnemonic_ids @@ -1719,7 +1721,7 @@ def _verify_checksum(self, mnemonic_words): try: bit_string = "".join(self._word_to_binary[w] for w in mnemonic_words) except: - # only get here if there was something wrong with the nemonic words + # only get here if there was something wrong with the nemonic sentence: print ("invalid nemonic sentence: ") print (mnemonic_words) return False @@ -3452,14 +3454,46 @@ def run_btcrecover(typos, big_typos = 0, min_typos = 0, is_performance = False, if num_replacecloseword < typos: l_btcr_args += " --max-typos-replacecloseword " + str(num_replacecloseword) + # Add ETA calculation before btcrpass.parse_arguments + if tokenlist and not any('--no-eta' in arg for arg in extra_args): + try: + with open(tokenlist, 'r') as tokenlist_file: + lines = parse_tokenlist_for_eta(tokenlist_file) + if lines is not None: + seed_length = mnemonic_length if mnemonic_length else 12 + total_count = calculate_total_permutations(lines, seed_length) + print(f"Will test {total_count:,} possible seed phrases") + # Set up progress tracking + btcrpass.passwords_count = total_count + btcrpass.count_and_check_eta = lambda *args, **kwargs: total_count + btcrpass.has_progress = True + btcrpass.max_eta = total_count + btcrpass.show_progress = True + btcrpass.show_speed = True + btcrpass.display_performance = True + except Exception as e: + print(f"Notice: Using default progress calculation method ({str(e)})") + + # Configure progress bar + progress_bar_widgets = [ + progressbar.SimpleProgress(), " ", + progressbar.Bar(left="[", fill="-", right="]"), + " ", + progressbar.FileTransferSpeed(unit="P"), + " ", + progressbar.FormatLabel("elapsed: %(elapsed)s, "), + progressbar.ETA() + ] + btcrpass.parse_arguments( l_btcr_args.split() + extra_args, inserted_items= ids_to_try_inserting, wallet= loaded_wallet, - base_iterator= (mnemonic_ids_guess,) if not is_performance else None, # the one guess to modify + base_iterator= (mnemonic_ids_guess,) if not is_performance else None, perf_iterator= lambda: loaded_wallet.performance_iterator(), check_only= loaded_wallet.verify_mnemonic_syntax, - disable_security_warning_param=True + disable_security_warning_param=True, + progress_bar_widgets=progress_bar_widgets # Pass the widgets to btcrpass ) (mnemonic_found, not_found_msg) = btcrpass.main() @@ -4165,3 +4199,143 @@ def show_mnemonic_gui(mnemonic_sentence, path_coin): entry.focus_set() tk_root.mainloop() # blocks until the user closes the window pause_at_exit = False + +def parse_tokenlist_for_eta(tokenlist_file): + """ + Parse the token list into a list of lists, where each sublist contains unique alternatives for a line. + Returns None if the list has fewer than 12 lines. + """ + try: + lines = [] + for line in tokenlist_file: + line = line.strip() + if line and not line.startswith('#'): # Skip empty lines and comments + alternatives = [word.strip() for word in line.split(',')] + unique_alts = list(dict.fromkeys(alternatives)) # Remove duplicates within line + if unique_alts: # Only add if there are valid alternatives + if len(unique_alts) != len(alternatives): + print(f"Notice: Removed duplicates from line: {alternatives}") + lines.append(unique_alts) + + if len(lines) < 12: + print(f"Error: Token list must contain at least 12 lines (found {len(lines)})") + return None + return lines + except Exception as e: + print(f"Error reading token list: {str(e)}") + return None + +def get_multisets(n_needed, num_types, max_freqs): + """ + Generate all multisets of size n_needed from num_types distinct types with maximum frequencies. + Yields tuples of frequencies for each type. + """ + def generate_multisets(prefix, remaining, num_elements): + if num_elements == 0: + if remaining == 0: + yield tuple(prefix) + return + start = prefix[-1] if prefix else 0 + max_freq = max_freqs[len(prefix)] + for i in range(start, min(remaining, max_freq) + 1): + yield from generate_multisets(prefix + [i], remaining - i, num_elements - 1) + yield from generate_multisets([], n_needed, num_types) + +def calculate_total_permutations(lines, seed_length=12): + """ + Calculate the total number of unique seed phrases. + Parameters: + lines: List of lists, where each inner list contains alternative words for a line. + seed_length: Integer, length of the seed phrase (defaults to 12 for BIP39) + """ + if not lines: + print("Error: Token list is empty") + return 0 + + if len(lines) < seed_length: + print(f"Error: Token list has insufficient lines ({len(lines)}) for seed length {seed_length}") + return 0 + + # Convert lines to tuples for hashability (to identify duplicates) + line_tuples = [tuple(line) for line in lines] + + # Count frequencies of each distinct line type + line_freq = collections.Counter(line_tuples) + distinct_lines = list(line_freq.keys()) # Unique line types + frequencies = list(line_freq.values()) # How many times each type appears + num_distinct = len(distinct_lines) + + # Number of alternatives per distinct line type + choices = [len(line) for line in distinct_lines] + + # Generate all combinations of choosing xi instances of each line type, where sum(xi) = seed_length + def generate_combinations(current, remaining, index): + if index == num_distinct: + if remaining == 0: + yield tuple(current) + return + fi = frequencies[index] + # xi ranges from 0 to min(remaining, fi) + for xi in range(min(remaining, fi) + 1): + yield from generate_combinations(current + [xi], remaining - xi, index + 1) + + total = 0 + + # Iterate over all valid combinations + for combo in generate_combinations([], seed_length, 0): + # Compute permutations: seed_length! / (x1! * x2! * ... * xk!) + denominator = 1 + for xi in combo: + if xi > 0: + denominator *= factorial(xi) + perms = factorial(seed_length) // denominator + + # Compute choices: c1^x1 * c2^x2 * ... * ck^xk + choice_product = 1 + for xi, ci in zip(combo, choices): + if xi > 0: + choice_product *= ci ** xi + + total += perms * choice_product + + return total + +def enable_progress(): + """Enable progress tracking without disabling ETA""" + global passwords_count + if not passwords_count: + return False + return True + +# Progress bar configuration for seed recovery +def configure_seed_progress(tokenlist=None, extra_args=None): + if tokenlist and not any('--no-eta' in arg for arg in (extra_args or [])): + try: + with open(tokenlist, 'r') as tokenlist_file: + lines = parse_tokenlist_for_eta(tokenlist_file) + if lines is not None: + seed_length = mnemonic_length if mnemonic_length else 12 + total_count = calculate_total_permutations(lines, seed_length) + print(f"Will test {total_count:,} possible seed phrases") + # Set up progress tracking + btcrpass.passwords_count = total_count + btcrpass.count_and_check_eta = lambda *args, **kwargs: total_count + btcrpass.has_progress = True + btcrpass.max_eta = total_count + btcrpass.show_progress = True + btcrpass.show_speed = True + btcrpass.display_performance = True + except Exception as e: + print(f"Notice: Using default progress calculation method ({str(e)})") + + # Configure progress bar widgets + progress_bar_widgets = [ + progressbar.SimpleProgress(), " ", + progressbar.Bar(left="[", fill="-", right="]"), + " ", + progressbar.FileTransferSpeed(unit="P"), + " ", + progressbar.FormatLabel("elapsed: %(elapsed)s, "), + progressbar.ETA() + ] + btcrpass.progress_bar_widgets_global = progress_bar_widgets