Source code for SimExLite.SampleData

# Copyright (C) 2021 Juncheng E
# Contact: Juncheng E <juncheng.e@xfel.eu>
# This file is part of SimEx-Lite which is released under GNU General Public License v3.
# See file LICENSE or go to <http://www.gnu.org/licenses> for full license details.
"""Sample Structure Data APIs"""
import h5py
# import ase


[docs]class MolecularDynamicsData: """Molecular Dynamics Data class :param input_path: Path to data file :type input_path: str """ def __init__(self, input_path=None): self.input_path = input_path self.file_format = self.__getFileFormat() def __getFileFormat(self) -> str: """ Check the file format. HDF5: openPMD-MD XMDYN ASCII: Handled by ASE """ if h5py.is_hdf5(self.input_path): with h5py.File(self.input_path, 'r') as h5: try: version = h5['/info/package_version'][()].decode('ascii') except KeyError: return "UNKNOWN" if version.find('XMDYN') != -1: return "XMDYN" else: return "UNKNOWN"
# %%
[docs]class xmdynData: """xmdyn Data class :param input_path: Path to XMDYN hdf5 data :type input_path: str """ def __init__(self, input_path=None): self.input_path = input_path
[docs] def getSnpGroupList(self, snp_idx=1): """Get the dataset list of a snapshot group. :param snp_idx: Snapshot index to check, defaults to 1 :type snp_idx: int :return: A list of datasets in the snapshot group :rtype: list """ return getItemList(self.input_path, 'data/' + snpName(snp_idx))
# Softlink seems not working for datasets # def softlinkData(self, ref_dset, ref_snp=1): # """Replace a dataset of a snapshot with a softlink of a # reference snapshot. # :param ref_dset: The reference dataset name. # :type ref_dset: str # :param ref_snp: The reference snapshot, defaults to 1. # :type ref_snp: int # """ # input_path = self.input_path # ref_snp_name = snpName(ref_snp) # with h5py.File(input_path, "r+") as h5_in: # snp_range = getSnpRange(input_path) # ref_snp_index = ref_snp - 1 # snp_range.pop(ref_snp_index) # # for snp_idx in tqdm(snp_range): # for snp_idx in snp_range: # snp = snpName(snp_idx) # try: # del h5_in['data'][snp][ref_dset] # except KeyError: # pass # h5_in['data'][snp][ref_dset] = h5py.SoftLink( # 'data/{}/{}'.format(ref_snp_name, ref_dset))
[docs] def replaceWithExternal(self, ref_path, ref_dsets, ref_snp=1, snp_range=None): """Replace a dataset in a snapshot range with that in another referece .h5 file. :param input_path: The input .h5 file name :type input_path: str :param ref_path: The reference .h5 file name :type ref_path: str :param ref_dsets: The reference dataset name. :type ref_dsets: str :param ref_snp: The reference snapshot, defaults to 1. :type ref_snp: int :param snp_range: The snapshot range to replace, defaults to ``None``, which takes the snapshot range in the reference .h5 file. :type snp_range: list-like """ try: iter(ref_dsets) except TypeError: ref_dsets = [ref_dsets] if not snp_range: snp_range = getSnpRange(ref_path) try: iter(snp_range) except TypeError: raise TypeError('snp_range ({}) is not iterable'.format( type(snp_range))) with h5py.File(self.input_path, "r+") as h5_in: for snp_idx in snp_range: snp = snpName(snp_idx) for ref_dset in ref_dsets: try: del h5_in['data'][snp][ref_dset] except KeyError: pass h5_in['data'][snp][ref_dset] = h5py.ExternalLink( ref_path, 'data/{}/{}'.format(snpName(ref_snp), ref_dset))
[docs]def snpName(idx): """Get the snapshot group name from an int index""" return 'snp_{0:07}'.format(idx)
[docs]def getItemList(input_path, group): """Get a list of items in one group in a hdf5 file. :param input_path: The hdf5 file of to check. :type input_path: str :param group: The group path to check. :type group: str :return: (item name, data shape) in the group :rtype: list """ with h5py.File(input_path, 'r') as h5: h5group = h5[group] items = [] for dname, ds in h5group.items(): try: items.append((dname, ds.shape)) except AttributeError: items.append((dname, 'group')) return items
[docs]def createLinkedData(input_path, link_path, snp_range=None, ref_dsets=None, ref_snp=1): """Create a hdf5 data with all items linked to an input data. A certain dataset can be replaced with that in a reference snapshot, if ``ref_snp`` and ``ref_dsets`` are provided. :param input_path: The input hdf5 file name :type input_path: str :param link_path: The hdf5 file name holding the links :type link_path: str :param snp_range: The snapshot range to replace, defaults to ``None``, which takes the snapshot range in the reference .h5 file. :type snp_range: list-like :param ref_dsets: The reference dataset name. :type ref_dsets: str :param ref_snp: The reference snapshot, defaults to 1. :type ref_snp: int """ # Set it iterable try: iter(ref_dsets) except TypeError: ref_dsets = [ref_dsets] if not snp_range: snp_range = getSnpRange(input_path) try: iter(snp_range) except TypeError: raise TypeError('snp_range ({}) is not iterable'.format( type(snp_range))) with h5py.File(link_path, "w") as h5_out: h5_out_data = h5_out.create_group('data') with h5py.File(input_path, 'r') as h5_in: for key in h5_in: if key != 'data': h5_out[key] = h5py.ExternalLink(input_path, key) key = 'data/angle' h5_out[key] = h5py.ExternalLink(input_path, key) for snp_idx in snp_range: snp = snpName(snp_idx) h5_out_data.create_group(snp) for item in h5_in['data'][snp]: if item in ref_dsets: h5_out_data[snp][item] = h5py.ExternalLink( input_path, 'data/{}/{}'.format(snpName(ref_snp), item)) else: h5_out_data[snp][item] = h5py.ExternalLink( input_path, 'data/{}/{}'.format(snp, item))
[docs]def getSnpRange(input_path): """Get the snapshot range of a .h5 file""" with h5py.File(input_path, "r") as h5_in: snp_range = [] for i in h5_in['data'].items(): group_name = i[0] if group_name != 'angle': snp_range.append(int(group_name.split('_')[1])) return snp_range
[docs]def replaceDset(input_path, ref_path, ref_dsets): """Replace/Add dataset with that in another referece .h5 file. """ try: iter(ref_dsets) except TypeError: ref_dsets = [ref_dsets] with h5py.File(input_path, "r+") as h5_in: for ref_dset in ref_dsets: try: del h5_in[ref_dset] except KeyError: pass h5_in[ref_dset] = h5py.ExternalLink(ref_path, ref_dset)