Source code for mbgdml._gdml.train

# MIT License
#
# Copyright (c) 2018-2022, Stefan Chmiela
# Copyright (c) 2022-2023, Alex M. Maldonado
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# pylint: disable=too-many-branches, too-many-statements

from functools import partial
import multiprocessing as mp
import numpy as np
import psutil

from .solvers.analytic import Analytic
from .predict import GDMLPredict
from .desc import Desc
from .sample import draw_strat_sample
from .perm import find_perms
from ..utils import md5_data, chunk_array
from ..losses import mae, rmse
from ..logger import GDMLLogger
from .. import _version

mbgdml_version = _version.get_versions()["version"]

Pool = mp.get_context("fork").Pool

try:
    import torch
except ImportError:
    _HAS_TORCH = False
else:
    _HAS_TORCH = True

try:
    _TORCH_MPS_IS_AVAILABLE = torch.backends.mps.is_available()
except (NameError, AttributeError):
    _TORCH_MPS_IS_AVAILABLE = False
_TORCH_MPS_IS_AVAILABLE = False

try:
    _TORCH_CUDA_IS_AVAILABLE = torch.cuda.is_available()
except (NameError, AttributeError):
    _TORCH_CUDA_IS_AVAILABLE = False

# TODO: remove exception handling once iterative solver ships
try:
    from .solvers.iterative import Iterative
except ImportError:
    pass

log = GDMLLogger(__name__)


def _share_array(arr_np, typecode_or_type):
    r"""Return a ctypes array allocated from shared memory with data from a
    NumPy array.

    Parameters
    ----------
    arr_np : :obj:`numpy.ndarray`
        NumPy array.
    typecode_or_type : char or ``ctype``
        Either a ctypes type or a one character typecode of the
        kind used by the Python array module.

    Returns
    -------
    Array of ``ctype``.
    """
    arr = mp.RawArray(typecode_or_type, arr_np.ravel())
    return arr, arr_np.shape


[docs]def _assemble_kernel_mat_wkr( j, tril_perms_lin, sig, use_E_cstr=False, exploit_sym=False, cols_m_limit=None ): r"""Compute one row and column of the force field kernel matrix. The Hessian of the Matern kernel is used with n = 2 (twice differentiable). Each row and column consists of matrix-valued blocks, which encode the interaction of one training point with all others. The result is stored in shared memory (a global variable). Parameters ---------- j : :obj:`int` Index of training point. tril_perms_lin : :obj:`numpy.ndarray` of :obj:`int` 1D array containing all recovered permutations expanded as one large permutation to be applied to a tiled copy of the object to be permuted. sig : :obj:`int` or :obj:`float` Hyperparameter sigma (kernel length scale). use_E_cstr : :obj:`bool`, optional Include energy constraints in the kernel. This can sometimes be helpful in tricky cases. exploit_sym : :obj:`bool`, optional Do not create symmetric entries of the kernel matrix twice (this only works for specific inputs for ``cols_m_limit``) cols_m_limit : :obj:`int`, optional Limit the number of columns (include training points 1-``M``). Note that each training points consists of multiple columns. Returns ------- :obj:`int` Number of kernel matrix blocks created, divided by 2 (symmetric blocks are always created at together). """ # pylint: disable=invalid-name, undefined-variable global glob # pylint: disable=global-variable-not-assigned R_desc = np.frombuffer(glob["R_desc"]).reshape(glob["R_desc_shape"]) R_d_desc = np.frombuffer(glob["R_d_desc"]).reshape(glob["R_d_desc_shape"]) K = np.frombuffer(glob["K"]).reshape(glob["K_shape"]) desc_func = glob["desc_func"] n_train, dim_d = R_d_desc.shape[:2] n_atoms = int((1 + np.sqrt(8 * dim_d + 1)) / 2) dim_i = 3 * n_atoms n_perms = int(len(tril_perms_lin) / dim_d) # (block index in final K, block index global, indices of partials within block) if isinstance(j, tuple): # Selective/"fancy" indexing ( K_j, j, keep_idxs_3n, ) = j blk_j = slice(K_j, K_j + len(keep_idxs_3n)) else: # Sequential indexing K_j = j * dim_i if j < n_train else n_train * dim_i + (j % n_train) blk_j = slice(K_j, K_j + dim_i) if j < n_train else slice(K_j, K_j + 1) keep_idxs_3n = slice(None) # same as [:] # Note: The modulo-operator wraps around the index pointer on the training points # when energy constraints are used in the kernel. In that case each point is # accessed twice. # Create permutated variants of 'rj_desc' and 'rj_d_desc'. rj_desc_perms = np.reshape( np.tile(R_desc[j % n_train, :], n_perms)[tril_perms_lin], (n_perms, -1), order="F", ) rj_d_desc = desc_func.d_desc_from_comp(R_d_desc[j % n_train, :, :])[0][ :, keep_idxs_3n ] # convert descriptor back to full representation rj_d_desc_perms = np.reshape( np.tile(rj_d_desc.T, n_perms)[:, tril_perms_lin], (-1, dim_d, n_perms) ) mat52_base_div = 3 * sig**4 sqrt5 = np.sqrt(5.0) sig_pow2 = sig**2 dim_i_keep = rj_d_desc.shape[1] diff_ab_outer_perms = np.empty((dim_d, dim_i_keep)) diff_ab_perms = np.empty((n_perms, dim_d)) ri_d_desc = np.zeros((1, dim_d, dim_i)) # must be zeros! k = np.empty((dim_i, dim_i_keep)) if ( j < n_train ): # This column only contains second and first derivative constraints. # for i in range(j if exploit_sym else 0, n_train): for i in range(0, n_train): blk_i = slice(i * dim_i, (i + 1) * dim_i) # diff_ab_perms = R_desc[i, :] - rj_desc_perms np.subtract(R_desc[i, :], rj_desc_perms, out=diff_ab_perms) norm_ab_perms = sqrt5 * np.linalg.norm(diff_ab_perms, axis=1) mat52_base_perms = np.exp(-norm_ab_perms / sig) / mat52_base_div * 5 np.einsum( "ki,kj->ij", diff_ab_perms * mat52_base_perms[:, None] * 5, np.einsum("ki,jik -> kj", diff_ab_perms, rj_d_desc_perms), out=diff_ab_outer_perms, ) diff_ab_outer_perms -= np.einsum( "ikj,j->ki", rj_d_desc_perms, (sig_pow2 + sig * norm_ab_perms) * mat52_base_perms, ) # ri_d_desc = desc_func.d_desc_from_comp(R_d_desc[i, :, :])[0] desc_func.d_desc_from_comp(R_d_desc[i, :, :], out=ri_d_desc) # K[blk_i, blk_j] = ri_d_desc[0].T.dot(diff_ab_outer_perms) np.dot(ri_d_desc[0].T, diff_ab_outer_perms, out=k) K[blk_i, blk_j] = k # this will never be called with 'keep_idxs_3n' set to anything else # than [:] if exploit_sym and (cols_m_limit is None or i < cols_m_limit): K[blk_j, blk_i] = K[blk_i, blk_j].T # First derivative constraints if use_E_cstr: K_fe = ( 5 * diff_ab_perms / (3 * sig**3) * (norm_ab_perms[:, None] + sig) * np.exp(-norm_ab_perms / sig)[:, None] ) K_fe = -np.einsum("ik,jki -> j", K_fe, rj_d_desc_perms) E_off_i = n_train * dim_i # , K.shape[1] - n_train K[E_off_i + i, blk_j] = K_fe else: if use_E_cstr: # rj_d_desc = desc_func.d_desc_from_comp(R_d_desc[j % n_train, :, :])[0][ # :, : # ] # convert descriptor back to full representation # rj_d_desc_perms = np.reshape( # np.tile(rj_d_desc.T, n_perms)[:, tril_perms_lin], (-1, dim_d, n_perms) # ) E_off_i = n_train * dim_i # Account for 'alloc_extra_rows'!. # blk_j_full = slice((j % n_train) * dim_i, ((j % n_train) + 1) * dim_i) # for i in range((j % n_train) if exploit_sym else 0, n_train): for i in range(0, n_train): ri_desc_perms = np.reshape( np.tile(R_desc[i, :], n_perms)[tril_perms_lin], (n_perms, -1), order="F", ) ri_d_desc = desc_func.d_desc_from_comp(R_d_desc[i, :, :])[ 0 ] # convert descriptor back to full representation ri_d_desc_perms = np.reshape( np.tile(ri_d_desc.T, n_perms)[:, tril_perms_lin], (-1, dim_d, n_perms), ) diff_ab_perms = R_desc[j % n_train, :] - ri_desc_perms norm_ab_perms = sqrt5 * np.linalg.norm(diff_ab_perms, axis=1) K_fe = ( 5 * diff_ab_perms / (3 * sig**3) * (norm_ab_perms[:, None] + sig) * np.exp(-norm_ab_perms / sig)[:, None] ) K_fe = -np.einsum("ik,jki -> j", K_fe, ri_d_desc_perms) blk_i_full = slice(i * dim_i, (i + 1) * dim_i) K[blk_i_full, K_j] = K_fe # vertical K[E_off_i + i, K_j] = -( 1 + (norm_ab_perms / sig) * (1 + norm_ab_perms / (3 * sig)) ).dot(np.exp(-norm_ab_perms / sig)) return blk_j.stop - blk_j.start
[docs]class GDMLTrain: r"""Train GDML force fields. This class is used to train models using different closed-form and numerical solvers. GPU support is provided through PyTorch (requires optional ``torch`` dependency to be installed) for some solvers. """ def __init__(self, max_memory=None, max_processes=None, use_torch=False): """ Parameters ---------- max_memory : :obj:`int`, default: :obj:`None` Limit the maximum memory usage. This is a soft limit that cannot always be enforced. max_processes : :obj:`int`, default: :obj:`None` Limit the max. number of processes. Otherwise all CPU cores are used. This parameters has no effect if ``use_torch=True`` use_torch : :obj:`bool`, default: ``False`` Use PyTorch to calculate predictions (if supported by solver). Raises ------ Exception If multiple instances of this class are created. ImportError If the optional PyTorch dependency is missing, but PyTorch features are used. """ global glob # pylint: disable=global-variable-undefined if "glob" not in globals(): # Don't allow more than one instance of this class. glob = {} else: raise Exception( "You can not create multiple instances of this class. " "Please reuse your first one." ) total_memory = psutil.virtual_memory().total // 2**30 # bytes to GB) self._max_memory = ( min(max_memory, total_memory) if max_memory is not None else total_memory ) total_cpus = mp.cpu_count() self._max_processes = ( min(max_processes, total_cpus) if max_processes is not None else total_cpus ) self._use_torch = use_torch if use_torch and not _HAS_TORCH: raise ImportError( "Optional PyTorch dependency not found! Please install PyTorch." ) def __del__(self): global glob # pylint: disable=global-variable-undefined if "glob" in globals(): del glob
[docs] def create_task( self, train_dataset, n_train, valid_dataset, n_valid, sig, lam=1e-10, perms=None, use_sym=True, use_E=True, use_E_cstr=False, use_cprsn=False, solver=None, solver_tol=1e-4, idxs_train=None, idxs_valid=None, ): r"""Create a data structure, :obj:`dict`, of custom type ``task``. These data structures serve as recipes for model creation, summarizing the configuration of one particular training run. Training and test points are sampled from the provided dataset, without replacement. If the same dataset if given for training and testing, the subsets are drawn without overlap. Each task also contains a choice for the hyperparameters of the training process and the MD5 fingerprints of the used datasets. Parameters ---------- train_dataset : :obj:`dict` Data structure of custom type ``dataset`` containing train dataset. n_train : :obj:`int` Number of training points to sample. valid_dataset : :obj:`dict` Data structure of custom type ``dataset`` containing validation dataset. n_valid : :obj:`int` Number of validation points to sample. sig : :obj:`int` Hyperparameter sigma (kernel length scale). lam : :obj:`float`, default: ``1e-10`` Hyperparameter lambda (regularization strength). .. note:: Early sGDML models used ``1e-15``. perms : :obj:`numpy.ndarray`, optional An 2D array of size P x N containing P possible permutations of the N atoms in the system. This argument takes priority over the ones provided in the training dataset. No automatic discovery is run when this argument is provided. use_sym : :obj:`bool`, default: ``True`` True: include symmetries (sGDML), False: GDML. use_E : :obj:`bool`, optional ``True``: reconstruct force field with corresponding potential energy surface, ``False``: ignore energy during training, even if energy labels are available in the dataset. The trained model will still be able to predict energies up to an unknown integration constant. Note, that the energy predictions accuracy will be untested. use_E_cstr : :obj:`bool`, default: ``False`` Include energy constraints in the kernel. This can sometimes be helpful in tricky cases. use_cprsn : :obj:`bool`, default: ``False`` Compress the kernel matrix along symmetric degrees of freedom. If ``False``, training is done on the full kernel matrix. solver : :obj:`str`, default: :obj:`None` Type of solver to use for training. ``'analytic'`` is currently the only option and defaults to this. Returns ------- :obj:`dict` Data structure of custom type ``task``. Raises ------ ValueError If a reconstruction of the potential energy surface is requested, but the energy labels are missing in the dataset. """ # pylint: disable=invalid-name ### mbGDML ADD ### log.info( "-----------------------------------\n" "| Creating GDML training task |\n" "-----------------------------------\n" ) log.log_model( { "Z": train_dataset["Z"], "n_train": n_train, "n_valid": n_valid, "sig": sig, "lam": lam, "use_sym": use_sym, "use_E": use_E, "use_E_cstr": use_E_cstr, "use_cprsn": use_cprsn, "type": "t", } ) ### mbGDML ADD END ### if use_E and "E" not in train_dataset: raise ValueError( "No energy labels found in dataset!\n" + "By default, force fields are always reconstructed including the\n" + "corresponding potential energy surface (this can be turned off).\n" + "However, the energy labels are missing in the provided dataset.\n" ) use_E_cstr = use_E and use_E_cstr # Is not needed in this function (mbGDML CHANGED) # n_atoms = train_dataset['R'].shape[1] ### mbGDML CHANGE ### log.info("\nDataset splitting\n-----------------") md5_train_keys = ["Z", "R", "F"] md5_valid_keys = ["Z", "R", "F"] if "E" in train_dataset.keys(): md5_train_keys.append("E") if "E" in valid_dataset.keys(): md5_valid_keys.append("E") md5_train = md5_data(train_dataset, md5_train_keys) md5_valid = md5_data(valid_dataset, md5_valid_keys) log.info("\n# Training #") log.info("MD5 : %s", md5_train) log.info("Size : %d", n_train) if idxs_train is None: log.info("Drawing structures from the dataset") if "E" in train_dataset: log.info( "Energies are included in the dataset\n" "Using the Freedman-Diaconis rule" ) idxs_train = draw_strat_sample(train_dataset["E"], n_train) else: log.info( "Energies are not included in the dataset\n" "Randomly selecting structures" ) idxs_train = np.random.choice( np.arange(train_dataset["F"].shape[0]), n_train, replace=False, ) else: log.info("Training indices were manually specified") idxs_train = np.array(idxs_train) log.log_array(idxs_train, level=10) # Handles validation indices. log.info("\n# Validation #") log.info("MD5 : %s", md5_valid) log.info("Size : %d", n_valid) if idxs_valid is not None: log.info("Validation indices were manually specified") idxs_valid = np.array(idxs_valid) log.log_array(idxs_valid, level=10) else: log.info("Drawing structures from the dataset") excl_idxs = ( idxs_train if md5_train == md5_valid else np.array([], dtype=np.uint) ) log.debug("Excluded %r structures", len(excl_idxs)) log.log_array(excl_idxs, level=10) if "E" in valid_dataset: idxs_valid = draw_strat_sample( valid_dataset["E"], n_valid, excl_idxs=excl_idxs, ) else: idxs_valid_cands = np.setdiff1d( np.arange(valid_dataset["F"].shape[0]), excl_idxs, assume_unique=True, ) idxs_valid = np.random.choice(idxs_valid_cands, n_valid, replace=False) ### mbGDML CHANGE END ### R_train = train_dataset["R"][idxs_train, :, :] task = { "type": "t", "code_version": mbgdml_version, "dataset_name": train_dataset["name"].astype(str), "dataset_theory": train_dataset["theory"].astype(str), "Z": train_dataset["Z"], "R_train": R_train, "F_train": train_dataset["F"][idxs_train, :, :], "idxs_train": idxs_train, "md5_train": md5_train, "idxs_valid": idxs_valid, "md5_valid": md5_valid, "sig": sig, "lam": lam, "use_E": use_E, "use_E_cstr": use_E_cstr, "use_sym": use_sym, "use_cprsn": use_cprsn, "solver_name": solver, "solver_tol": solver_tol, } if use_E: task["E_train"] = train_dataset["E"][idxs_train] lat_and_inv = None if "lattice" in train_dataset: log.info("\nLattice was found in the dataset") log.debug(train_dataset["lattice"]) task["lattice"] = train_dataset["lattice"] try: lat_and_inv = (task["lattice"], np.linalg.inv(task["lattice"])) except np.linalg.LinAlgError as e: raise ValueError( "Provided dataset contains invalid lattice vectors " "(not invertible). Note: Only rank 3 lattice vector matrices " "are supported." ) from e if "r_unit" in train_dataset and "e_unit" in train_dataset: log.info("\nCoordinate unit : %s", train_dataset["r_unit"]) log.info("Energy unit : %s", train_dataset["e_unit"]) task["r_unit"] = train_dataset["r_unit"] task["e_unit"] = train_dataset["e_unit"] if use_sym: # No permutations provided externally. if perms is None: if ( "perms" in train_dataset ): # take perms from training dataset, if available n_perms = train_dataset["perms"].shape[0] log.info("Using %d permutations included in dataset", n_perms) task["perms"] = train_dataset["perms"] else: # find perms from scratch n_train = R_train.shape[0] R_train_sync_mat = R_train if n_train > 1000: R_train_sync_mat = R_train[ np.random.choice(n_train, 1000, replace=False), :, : ] log.info( "Symmetry search has been restricted to a random subset of " "1000/%d training points for faster convergence.", n_train, ) # TODO: PBCs disabled when matching (for now). task["perms"] = find_perms( R_train_sync_mat, train_dataset["Z"], # lat_and_inv=None, lat_and_inv=lat_and_inv, max_processes=self._max_processes, ) else: # use provided perms n_atoms = len(task["Z"]) n_perms, perms_len = perms.shape if perms_len != n_atoms: raise ValueError( # TODO: Document me "Provided permutations do not match the number of atoms in " "dataset." ) log.info("Using %d externally provided permutations", n_perms) task["perms"] = perms else: task["perms"] = np.arange(train_dataset["R"].shape[1])[ None, : ] # no symmetries return task
[docs] def create_model( self, task, solver, R_desc, R_d_desc, tril_perms_lin, std, alphas_F, alphas_E=None, ): r"""Create a data structure, :obj:`dict`, of custom type ``model``. These data structures contain the trained model are everything that is needed to generate predictions for new inputs. Each task also contains the MD5 fingerprints of the used datasets. Parameters ---------- task : :obj:`dict` Data structure of custom type ``task`` from which the model emerged. solver : :obj:`str` Identifier string for the solver that has been used to train this model. R_desc : :obj:`numpy.ndarray`, ndim: ``2`` An array of size :math:`M \\times D` containing the descriptors of dimension :math:`D` for :math:`M` molecules. R_d_desc : :obj:`numpy.ndarray`, ndim: ``2`` An array of size :math:`M \\times D \\times 3N` containing of the descriptor Jacobians for :math:`M` molecules. The descriptor has dimension :math:`D` with :math:`3N` partial derivatives with respect to the :math:`3N` Cartesian coordinates of each atom. tril_perms_lin : :obj:`numpy.ndarray`, ndim: ``1`` An array containing all recovered permutations expanded as one large permutation to be applied to a tiled copy of the object to be permuted. std : :obj:`float` Standard deviation of the training labels. alphas_F : :obj:`numpy.ndarray`, ndim: ``1`` An array of size :math:`3NM` containing of the linear coefficients that correspond to the force constraints. alphas_E : :obj:`numpy.ndarray`, ndim: ``1``, optional An array of size N containing of the linear coefficients that correspond to the energy constraints. Only used if ``use_E_cstr`` is ``True``. Returns ------- :obj:`dict` Data structure of custom type ``model``. """ # pylint: disable=invalid-name _, dim_d = R_d_desc.shape[:2] n_atoms = int((1 + np.sqrt(8 * dim_d + 1)) / 2) desc = Desc( n_atoms, max_processes=self._max_processes, ) dim_i = desc.dim_i R_d_desc_alpha = desc.d_desc_dot_vec(R_d_desc, alphas_F.reshape(-1, dim_i)) model = { "type": "m", "code_version": mbgdml_version, "dataset_name": task["dataset_name"], "dataset_theory": task["dataset_theory"], "solver_name": solver, "Z": task["Z"], "idxs_train": task["idxs_train"], "md5_train": task["md5_train"], "idxs_valid": task["idxs_valid"], "md5_valid": task["md5_valid"], "n_test": 0, "md5_test": None, "f_err": {"mae": np.nan, "rmse": np.nan}, "R_desc": R_desc.T, "R_d_desc_alpha": R_d_desc_alpha, "c": 0.0, "std": std, "sig": task["sig"], "lam": task["lam"], "alphas_F": alphas_F, "perms": task["perms"], "tril_perms_lin": tril_perms_lin, "use_E": task["use_E"], "use_cprsn": task["use_cprsn"], } if task["use_E"]: model["e_err"] = {"mae": np.nan, "rmse": np.nan} if task["use_E_cstr"]: model["alphas_E"] = alphas_E if "lattice" in task: model["lattice"] = task["lattice"] if "r_unit" in task and "e_unit" in task: model["r_unit"] = task["r_unit"] model["e_unit"] = task["e_unit"] return model
[docs] def train_labels(self, F, use_E, use_E_cstr, E=None): r"""Compute custom train labels. By default, they are the raveled forces scaled by the standard deviation. Energy labels are included if ``use_E`` and ``use_E_cstr`` are ``True``. .. note:: We use negative energies with the mean removed for training labels (if requested). Parameters ---------- F : :obj:`numpy.ndarray`, ndim: ``3`` Train set forces. use_E : :obj:`bool` If energies are used during training. use_E_cstr : :obj:`bool` If energy constraints are being added to the kernel. E : :obj:`numpy.ndarray`, default: :obj:`None` Train set energies. Returns ------- :obj:`numpy.ndarray` Train labels including forces and possibly energies. :obj:`float` Standard deviation of training labels. :obj:`float` Mean energy. If energy labels are not included then this is :obj:`None`. """ # pylint: disable=invalid-name y = F.ravel().copy() if use_E and use_E_cstr: E_train = E.ravel().copy() E_train_mean = np.mean(E_train) y = np.hstack((y, -E_train + E_train_mean)) else: E_train_mean = None y_std = np.std(y) y /= y_std return y, y_std, E_train_mean
# pylint: disable-next=invalid-name
[docs] def train(self, task, require_E_eval=False): r"""Train a model based on a task. Parameters ---------- task : :obj:`dict` Data structure of custom type ``task`` from :meth:`~mbgdml._gdml.train.GDMLTrain.create_task`. require_E_eval : :obj:`bool`, default: ``False`` Require energy evaluation. Returns ------- :obj:`dict` Data structure of custom type ``model`` from :meth:`~mbgdml._gdml.train.GDMLTrain.create_model`. Raises ------ ValueError If the provided dataset contains invalid lattice vectors. """ # pylint: disable=invalid-name task = dict(task) # make mutable n_train, n_atoms = task["R_train"].shape[:2] desc = Desc( n_atoms, max_processes=self._max_processes, ) n_perms = task["perms"].shape[0] tril_perms = np.array([desc.perm(p) for p in task["perms"]]) # dim_i = 3 * n_atoms # Unused variable dim_d = desc.dim perm_offsets = np.arange(n_perms)[:, None] * dim_d tril_perms_lin = (tril_perms + perm_offsets).flatten("F") lat_and_inv = None if "lattice" in task: try: lat_and_inv = (task["lattice"], np.linalg.inv(task["lattice"])) except np.linalg.LinAlgError as e: raise ValueError( "Provided dataset contains invalid lattice vectors (not " "invertible). Note: Only rank 3 lattice vector matrices are " "supported." ) from e R = task["R_train"].reshape(n_train, -1) R_desc, R_d_desc = desc.from_R(R, lat_and_inv=lat_and_inv) # Generate label vector. if task["use_E"]: E_train = task["use_E"] else: E_train = None y, y_std, E_train_mean = self.train_labels( task["F_train"], task["use_E"], task["use_E_cstr"], E=E_train ) max_memory_bytes = self._max_memory * 1024**3 # Memory cost of analytic solver est_bytes_analytic = Analytic.est_memory_requirement(n_train, n_atoms) # Memory overhead (solver independent) est_bytes_overhead = y.nbytes est_bytes_overhead += R.nbytes est_bytes_overhead += R_desc.nbytes est_bytes_overhead += R_d_desc.nbytes solver_keys = {} use_analytic_solver = ( est_bytes_analytic + est_bytes_overhead ) < max_memory_bytes ### mbGDML CHANGE START ### if task["solver_name"] is None: # Fall back to analytic solver use_analytic_solver = True else: solver = task["solver_name"] assert solver in ("analytic", "iterative") use_analytic_solver = bool(solver == "analytic") ### mbGDML CHANGE END ### if use_analytic_solver: mem_req_mb = (est_bytes_analytic + est_bytes_overhead) * 1e-6 # MB log.info( "Using analytic solver (expected memory requirement: ~%.2f MB)", mem_req_mb, ) ### mbGDML CHANGE START ### analytic = Analytic(self, desc) alphas = analytic.solve(task, R_desc, R_d_desc, tril_perms_lin, y) solver_keys["norm_y_train"] = np.linalg.norm(y) ### mbGDML CHANGE END ### else: # Iterative solver max_n_inducing_pts = Iterative.max_n_inducing_pts( n_train, n_atoms, max_memory_bytes ) est_bytes_iterative = Iterative.est_memory_requirement( n_train, max_n_inducing_pts, n_atoms ) mem_req_mb = (est_bytes_iterative + est_bytes_overhead) * 1e-6 # MB log.info( "Using iterative solver (expected memory requirement: ~%.2f MB)", mem_req_mb, ) alphas_F = task["alphas0_F"] if "alphas0_F" in task else None alphas_E = task["alphas0_E"] if "alphas0_E" in task else None iterative = Iterative( self, desc, self._max_memory, self._max_processes, self._use_torch, ) ( alphas, solver_keys["solver_tol"], solver_keys[ "solver_iters" ], # number of iterations performed (cg solver) solver_keys["solver_resid"], # residual of solution _, # train_rmse solver_keys["inducing_pts_idxs"], is_conv, ) = iterative.solve( task, R_desc, R_d_desc, tril_perms_lin, y, y_std, ) solver_keys["norm_y_train"] = np.linalg.norm(y) if not is_conv: log.warning("Iterative solver did not converge!") log.info("Troubleshooting tips:") log.info( "(1) Are the provided geometries highly correlated (i.e. very " "similar to each other)?" ) log.info("(2) Try a larger length scale (sigma) parameter.") log.warning( "We will continue with this unconverged model, but its accuracy " "will likely be very bad." ) alphas_E = None alphas_F = alphas if task["use_E_cstr"]: alphas_E = alphas[-n_train:] alphas_F = alphas[:-n_train] model = self.create_model( task, "analytic" if use_analytic_solver else "cg", R_desc, R_d_desc, tril_perms_lin, y_std, alphas_F, alphas_E=alphas_E, ) model.update(solver_keys) # Recover integration constant. # Note: if energy constraints are included in the kernel (via 'use_E_cstr'), # do not compute the integration constant, but simply set it to the mean of # the training energies (which was subtracted from the labels before training). if model["use_E"]: c = ( self._recov_int_const( model, task, R_desc=R_desc, R_d_desc=R_d_desc, require_E_eval=require_E_eval, ) if E_train_mean is None else E_train_mean ) model["c"] = c return model
[docs] def _recov_int_const( self, model, task, R_desc=None, R_d_desc=None, require_E_eval=False ): r"""Estimate the integration constant for a force field model. The offset between the energies predicted for the original training data and the true energy labels is computed in the least square sense. Furthermore, common issues with the user-provided datasets are self diagnosed here. Parameters ---------- model : :obj:`dict` Data structure of custom type ``model``. task : :obj:`dict` Data structure of custom type ``task``. R_desc : :obj:`numpy.ndarray`, ndim: ``2``, optional An array of size :math:`M \\times D` containing the descriptors of dimension :math:`D` for :math:`M` molecules. R_d_desc : :obj:`numpy.ndarray`, ndim: ``2``, optional An array of size :math:`M \\times D \\times 3N` containing of the descriptor Jacobians for :math:`M` molecules. The descriptor has dimension :math:`D` with :math:`3N` partial derivatives with respect to the :math:`3N` Cartesian coordinates of each atom. require_E_eval : :obj:`bool`, default: ``False`` Force the computation and return of the integration constant regardless if there are significant errors. Returns ------- :obj:`float` Estimate for the integration constant. Raises ------ ValueError If the sign of the force labels in the dataset from which the model emerged is switched (e.g. gradients instead of forces). ValueError If inconsistent/corrupted energy labels are detected in the provided dataset. ValueError If different scales in energy vs. force labels are detected in the provided dataset. """ # pylint: disable=invalid-name gdml_predict = GDMLPredict( model, max_memory=self._max_memory, max_processes=self._max_processes, use_torch=self._use_torch, ) gdml_predict.set_R_desc(R_desc) gdml_predict.set_R_d_desc(R_d_desc) n_train = task["E_train"].shape[0] R = task["R_train"].reshape(n_train, -1) E_pred, _ = gdml_predict.predict(R) E_ref = np.squeeze(task["E_train"]) e_fact = np.linalg.lstsq( np.column_stack((E_pred, np.ones(E_ref.shape))), E_ref, rcond=-1 )[0][0] corrcoef = np.corrcoef(E_ref, E_pred)[0, 1] if not require_E_eval: if np.sign(e_fact) == -1: log.warning( "The provided dataset contains gradients instead of force labels " "(flipped sign). Please correct!\nNote: The energy prediction " "accuracy of the model will thus neither be validated nor tested " "in the following steps!" ) return None if corrcoef < 0.95: log.warning("Inconsistent energy labels detected!") log.warning( "The predicted energies for the training data are only weakly\n" "correlated with the reference labels (correlation " "coefficient %.2f)\nwhich indicates that the issue is most " "likely NOT just a unit conversion error.\n", corrcoef, ) return None if np.abs(e_fact - 1) > 1e-1: log.warning("Different scales in energy vs. force labels detected!\n") return None del gdml_predict # Least squares estimate for integration constant. return np.sum(E_ref - E_pred) / E_ref.shape[0]
[docs] def _assemble_kernel_mat( self, R_desc, R_d_desc, tril_perms_lin, sig, desc, use_E_cstr=False, col_idxs=np.s_[:], alloc_extra_rows=0, ): r"""Compute force field kernel matrix. The Hessian of the Matern kernel is used with n = 2 (twice differentiable). Each row and column consists of matrix-valued blocks, which encode the interaction of one training point with all others. The result is stored in shared memory (a global variable). Parameters ---------- R_desc : :obj:`numpy.ndarray`, ndim: ``2``, optional An array of size :math:`M \\times D` containing the descriptors of dimension :math:`D` for :math:`M` molecules. R_d_desc : :obj:`numpy.ndarray`, ndim: ``2``, optional An array of size :math:`M \\times D \\times 3N` containing of the descriptor Jacobians for :math:`M` molecules. The descriptor has dimension :math:`D` with :math:`3N` partial derivatives with respect to the :math:`3N` Cartesian coordinates of each atom. tril_perms_lin : :obj:`numpy.ndarray`, ndim: ``1`` An array containing all recovered permutations expanded as one large permutation to be applied to a tiled copy of the object to be permuted. sig : :obj:`int` Hyperparameter sigma (kernel length scale). use_E_cstr : :obj:`bool`, optional Include energy constraints in the kernel. This can sometimes be helpful in tricky cases. cols_m_limit : :obj:`int`, optional Only generate the columns up to index ``cols_m_limit``. This creates a :math:`M3N \\times` ``cols_m_limit``:math:`3N` kernel matrix, instead of :math:`M3N \\times M3N`. cols_3n_keep_idxs : :obj:`numpy.ndarray`, optional Only generate columns with the given indices in the :math:`3N \\times 3N` kernel function. The resulting kernel matrix will have dimension :math:`M3N \\times M` ``len(cols_3n_keep_idxs)``. Returns ------- :obj:`numpy.ndarray` Force field kernel matrix. """ # pylint: disable=invalid-name global glob # pylint: disable=global-variable-not-assigned n_train, dim_d = R_d_desc.shape[:2] dim_i = 3 * int((1 + np.sqrt(8 * dim_d + 1)) / 2) # Determine size of kernel matrix. K_n_rows = n_train * dim_i # Account for additional rows (and columns) due to energy constraints in the # kernel matrix. if use_E_cstr: K_n_rows += n_train if isinstance(col_idxs, slice): # indexed by slice K_n_cols = len(range(*col_idxs.indices(K_n_rows))) else: # indexed by list assert len(col_idxs) == len(set(col_idxs)) # assume no duplicate indices # Note: This function does not support unsorted (ascending) index arrays. assert np.array_equal(col_idxs, np.sort(col_idxs)) K_n_cols = len(col_idxs) # Make sure no indices are outside of the valid range. if K_n_cols > K_n_rows: raise ValueError("Columns indexed beyond range.") exploit_sym = False cols_m_limit = None # Check if range is a subset of training points (as opposed to a subset of # partials of multiple points). is_M_subset = ( isinstance(col_idxs, slice) and (col_idxs.start is None or col_idxs.start % dim_i == 0) and (col_idxs.stop is None or col_idxs.stop % dim_i == 0) and col_idxs.step is None ) if is_M_subset: M_slice_start = ( None if col_idxs.start is None else int(col_idxs.start / dim_i) ) M_slice_stop = None if col_idxs.stop is None else int(col_idxs.stop / dim_i) M_slice = slice(M_slice_start, M_slice_stop) J = range(*M_slice.indices(n_train + (n_train if use_E_cstr else 0))) if M_slice_start is None: exploit_sym = True cols_m_limit = M_slice_stop else: if isinstance(col_idxs, slice): # random = list(range(*col_idxs.indices(n_train * dim_i))) col_idxs = list(range(*col_idxs.indices(K_n_rows))) # Separate column indices of force-force and force-energy constraints. cond = col_idxs >= (n_train * dim_i) ff_col_idxs, fe_col_idxs = col_idxs[~cond], col_idxs[cond] # M - number training # N - number atoms # Column indices that go beyond force-force correlations need a different # treatment. n_idxs = np.concatenate( [np.mod(ff_col_idxs, dim_i), np.zeros(fe_col_idxs.shape, dtype=int)] ) m_idxs = np.concatenate([np.array(ff_col_idxs) // dim_i, fe_col_idxs]) m_idxs_uniq = np.unique(m_idxs) # which points to include? m_n_idxs = [ list(n_idxs[np.where(m_idxs == m_idx)]) for m_idx in m_idxs_uniq ] m_n_idxs_lens = [len(m_n_idx) for m_n_idx in m_n_idxs] m_n_idxs_lens.insert(0, 0) blk_start_idxs = list( np.cumsum(m_n_idxs_lens[:-1]) ) # index within K at which each block starts # tuples: (block index in final K, block index global, indices of partials # within block) J = list(zip(blk_start_idxs, m_idxs_uniq, m_n_idxs)) if self._use_torch: # pylint: disable=no-member, invalid-name, global-variable-undefined if not _HAS_TORCH: raise ImportError("PyTorch not found! Please install PyTorch.") K = np.empty((K_n_rows + alloc_extra_rows, K_n_cols)) if J is not list: J = list(J) global torch_assemble_done _, torch_assemble_done = K_n_cols, 0 if _TORCH_CUDA_IS_AVAILABLE: torch_device = "cuda" elif _TORCH_MPS_IS_AVAILABLE: torch_device = "mps" else: torch_device = "cpu" R_desc_torch = torch.from_numpy(R_desc).to(torch_device) # N, d R_d_desc_torch = torch.from_numpy(R_d_desc).to(torch_device) # pylint: disable-next=import-outside-toplevel from .torchtools import GDMLTorchAssemble torch_assemble = GDMLTorchAssemble( J, tril_perms_lin, sig, use_E_cstr, R_desc_torch, R_d_desc_torch, out=K[:K_n_rows, :], ) # Enable data parallelism n_gpu = torch.cuda.device_count() if n_gpu > 1: torch_assemble = torch.nn.DataParallel(torch_assemble) torch_assemble.to(torch_device) torch_assemble.forward(torch.arange(len(J))) del torch_assemble del R_desc_torch del R_d_desc_torch return K K = mp.RawArray("d", (K_n_rows + alloc_extra_rows) * K_n_cols) glob["K"], glob["K_shape"] = K, (K_n_rows + alloc_extra_rows, K_n_cols) glob["R_desc"], glob["R_desc_shape"] = _share_array(R_desc, "d") glob["R_d_desc"], glob["R_d_desc_shape"] = _share_array(R_d_desc, "d") glob["desc_func"] = desc pool = None map_func = map if self._max_processes != 1 and mp.cpu_count() > 1: pool = Pool( (self._max_processes or mp.cpu_count()) - 1 ) # exclude main process map_func = pool.imap_unordered _, done = K_n_cols, 0 for done_wkr in map_func( partial( _assemble_kernel_mat_wkr, tril_perms_lin=tril_perms_lin, sig=sig, use_E_cstr=use_E_cstr, exploit_sym=exploit_sym, cols_m_limit=cols_m_limit, ), J, ): done += done_wkr if pool is not None: pool.close() # Wait for the worker processes to terminate (to measure total runtime # correctly). pool.join() pool = None # Release some memory. glob.pop("K", None) glob.pop("R_desc", None) glob.pop("R_d_desc", None) return np.frombuffer(K).reshape((K_n_rows + alloc_extra_rows), K_n_cols)
[docs]def get_test_idxs(model, dataset, n_test=None): r"""Gets dataset indices for testing a model. Parameters ---------- model : :obj:`dict` Model to test. dataset : :obj:`dict` Dataset to be used for testing. n_test : :obj:`int`, default: :obj:`None` Number of points to include in test indices. Defaults to all available indices. Returns ------- :obj:`numpy.ndarray` Structure indices for model testing. """ # exclude training and/or test sets from validation set if necessary excl_idxs = np.empty((0,), dtype=np.uint) def convert_md5(md5_value): if isinstance(md5_value, np.ndarray): return str(md5_value.item()) return str(md5_value) md5_dset = convert_md5(dataset["md5"]) md5_train = convert_md5(model["md5_train"]) md5_valid = convert_md5(model["md5_valid"]) if md5_dset == md5_train: excl_idxs = np.concatenate([excl_idxs, model["idxs_train"]]).astype(np.uint) if md5_dset == md5_valid: excl_idxs = np.concatenate([excl_idxs, model["idxs_valid"]]).astype(np.uint) n_data = dataset["F"].shape[0] n_data_eff = n_data - len(excl_idxs) if n_test is None: n_test = n_data_eff if n_data_eff == 0: log.warning("No unused points for testing in provided dataset.") return None log.info("Test set size was automatically set to %d points.", n_data_eff) if "E" in dataset: test_idxs = draw_strat_sample(dataset["E"], n_test, excl_idxs=excl_idxs) else: test_idxs = np.delete(np.arange(n_data), excl_idxs) log.warning( "Test dataset will be sampled with no guidance from energy labels " "(randomly)!\nNote: Larger test datasets are recommended due to slower " "convergence of the error." ) return test_idxs
[docs]def add_valid_errors( model, dataset, overwrite=False, max_processes=None, use_torch=False ): r"""Calculate and add energy and force validation errors to a model. Parameters ---------- model : :obj:`dict` Trained GDML model. dataset : :obj:`dict` Validation dataset. overwrite : :obj:`bool`, default: ``False`` Will overwrite validation errors in model if they already exist. Return ------ :obj:`dict` Force and energy validation errors as arrays with keys ``'force'`` and ``'energy'``. :obj:`dict` Model with validation errors. """ log.info( "\n------------------------\n" "| Model Validation |\n" "------------------------" ) f_err = np.array(model["f_err"]).item() is_model_validated = not (np.isnan(f_err["mae"]) or np.isnan(f_err["rmse"])) if is_model_validated and not overwrite: log.warning("Model is already validated and overwrite is False.") return None _, E_errors, F_errors = model_errors( model, dataset, is_valid=True, max_processes=max_processes, use_torch=use_torch ) model["n_test"] = 0 # flag the model as not tested results = {"force": F_errors} model["f_err"] = {"mae": mae(F_errors), "rmse": rmse(F_errors)} if model["use_E"]: results["energy"] = E_errors model["e_err"] = {"mae": mae(E_errors), "rmse": rmse(E_errors)} else: results["energy"] = None return results, model
def save_model(model, model_path): np.savez_compressed(model_path, **model)
[docs]def model_errors( model, dataset, is_valid=False, n_test=None, max_processes=None, use_torch=False ): r"""Computes model errors for either validation or testing purposes. Parameters ---------- model : :obj:`dict` Trained GDML model. dataset : :obj:`dict` Dataset to test the model against. is_valid : :obj:`bool`, default: ``False`` Is this for model validation? Determines how we get the structure indices. n_test : :obj:`int`, default: :obj:`None` Number of desired testing indices. :obj:`None` will test against all available structures. Returns ------- :obj:`int` Number of structures predicted. :obj:`numpy.ndarray` (ndim: ``1``) Energy errors or :obj:`None`. :obj:`numpy.ndarray` (ndim: ``3``) Force errors. """ # pylint: disable=protected-access num_workers, batch_size = 0, 0 if not np.array_equal(model["Z"], dataset["Z"]): raise ValueError( "Atom composition or order in dataset does not match the one in model" ) if ("lattice" in model) is not ("lattice" in dataset): if "lattice" in model: raise ValueError("Model contains lattice vectors, but dataset does not.") if "lattice" in dataset: raise ValueError("Dataset contains lattice vectors, but model does not.") if is_valid: test_idxs = model["idxs_valid"] else: log.info( "\n---------------------\n" "| Model Testing |\n" "---------------------\n" ) log.log_model(model) test_idxs = get_test_idxs(model, dataset, n_test=n_test) Z = dataset["Z"] R = dataset["R"][test_idxs, :, :] F = dataset["F"][test_idxs, :, :] n_R = R.shape[0] if model["use_E"]: E = dataset["E"][test_idxs] gdml_predict = GDMLPredict(model, max_processes=max_processes, use_torch=use_torch) log.info("\nPredicting %r structures", len(test_idxs)) b_size = min(1000, len(test_idxs)) log.info("Batch size : %d structures\n", b_size) if not use_torch: log.debug("Using CPU (use_torch = False)") if num_workers == 0 or batch_size == 0: gdml_predict.prepare_parallel(n_bulk=b_size, return_is_from_cache=True) num_workers, batch_size, bulk_mp = ( gdml_predict.num_workers, gdml_predict.chunk_size, gdml_predict.bulk_mp, ) else: gdml_predict._set_num_workers(num_workers) gdml_predict._set_chunk_size(batch_size) gdml_predict._set_bulk_mp(bulk_mp) n_atoms = Z.shape[0] E_pred, F_pred = np.empty(n_R), np.empty(R.shape) t_pred = log.t_start() n_done = 0 for b_range in chunk_array(list(range(len(test_idxs))), b_size): log.info("%d done", n_done) n_done_step = len(b_range) n_done += n_done_step r = R[b_range].reshape(n_done_step, -1) e_pred, f_pred = gdml_predict.predict(r) F_pred[b_range] = f_pred.reshape((len(b_range), n_atoms, 3)) if model["use_E"]: E_pred[b_range] = e_pred log.info("%d done", n_done) del gdml_predict t_elapsed = log.t_stop(t_pred, message="\nTook {time} s") pred_rate = n_R / t_elapsed log.info("Prediction rate : %.2f structures per second", pred_rate) # Force errors log.info("Computing force (and energy) errors") F_errors = F_pred - F F_mae = mae(F_errors) F_rmse = rmse(F_errors) log.info("\nForce MAE : %.5f", F_mae) log.info("Force RMSE : %.5f", F_rmse) # Energy errors if model["use_E"]: E_errors = E_pred - E E_mae = mae(E_errors) E_rmse = rmse(E_errors) log.info("Energy MAE : %.5f", E_mae) log.info("Energy RMSE : %.5f", E_rmse) if model["use_E"]: return len(test_idxs), E_errors, F_errors return len(test_idxs), None, F_errors