Source code for SimExLite.DiffractionCalculators.SingFELDiffractionCalculator

# Copyright (C) 2022 Juncheng E
# Contact: Juncheng E <juncheng.e@xfel.eu>
# This file is part of SimEx-Lite which is released under GNU General Public License v3.
# See file LICENSE or go to <http://www.gnu.org/licenses> for full license details.

import os
from pathlib import Path
from subprocess import Popen, PIPE
import sys
import shlex
import h5py
import numpy as np
from tqdm.autonotebook import tqdm

try:
    from pysingfel.detector import Detector
    from pysingfel.beam import Beam
    from pysingfel.radiationDamage import setEnergyFromFile

    PYSINGFEL_AVAILABLE = True
except ModuleNotFoundError:
    PYSINGFEL_AVAILABLE = False
from libpyvinyl.BaseCalculator import BaseCalculator, CalculatorParameters
from libpyvinyl.BaseData import DataCollection
from SimExLite.DiffractionData import DiffractionData, SingFELFormat
from SimExLite.PMIData import XMDYNFormat
import shutil
from SimExLite.utils.Logger import setLogger

logger = setLogger("SingFELDiffractionCalculator")


[docs]class SingFELDiffractionCalculator(BaseCalculator): """Diffraction pattern calculator with pysingfel backend. Args: name (str): The name of this calculator. input (PMIData): The input data in `PMIData` class. :param output_keys: The key(s) of this calculator's output data. :param output_data_types: The data type(s), i.e., classes, of each output. It's a list of the data classes or a single data class. The available data classes are based on `BaseData`. :param output_filenames: The name(s) of the output file(s). It can be a str of a filename or a list of filenames. If the mapping is dict mapping, the name is `None`. Defaults to None. :param instrument_base_dir: The base directory for the instrument to which this calculator belongs. The final exact output file path depends on `instrument_base_dir` and `calculator_base_dir`: `instrument_base_dir`/`calculator_base_dir`/filename :param calculator_base_dir: The base directory for this calculator. The final exact output file path depends on `instrument_base_dir` and `calculator_base_dir`: `instrument_base_dir`/`calculator_base_dir`/filename :param parameters: The parameters for this calculator. """
[docs] def __init__( self, name: str, input: DataCollection, output_keys: str = "singfel_diffraction", output_data_types=DiffractionData, output_filenames: str = "diffr.h5", instrument_base_dir="./", calculator_base_dir="SingFELDiffractionCalculator", parameters=None, ): super().__init__( name, input, output_keys, output_data_types=output_data_types, output_filenames=output_filenames, instrument_base_dir=instrument_base_dir, calculator_base_dir=calculator_base_dir, parameters=parameters, )
def init_parameters(self): parameters = CalculatorParameters() uniform_rotation = parameters.new_parameter( "uniform_rotation", comment="If it's True, the orientations are fixed to ensure the SO3 space is always uniformly sampled." + " If it's False, it will be a random sampling complying a uniform distribution in the SO3 space", ) uniform_rotation.value = False calculate_Compton = parameters.new_parameter( "calculate_Compton", comment="If calculate the compton scattering." ) slice_interval = parameters.new_parameter( "slice_interval", comment="The slice interval of the pmi time frames" ) slice_index_upper = parameters.new_parameter( "slice_index_upper", comment="The upper limit of the slice index for diffraction calculation ", ) back_rotation = parameters.new_parameter( "back_rotation", comment="Before applying diffraction rotation, rotate the sample back to its original position before photon matter interaction.", ) pmi_start_ID = parameters.new_parameter( "pmi_start_ID", comment="The start ID of the pmi files" ) pmi_stop_ID = parameters.new_parameter( "pmi_stop_ID", comment="The stop ID of the pmi files" ) number_of_diffraction_patterns = parameters.new_parameter( "number_of_diffraction_patterns", comment="The number of diffraction patterns to generate per pmi file", ) pixel_size = parameters.new_parameter( "pixel_size", comment="The pixel size of the detector", unit="m" ) pixels_x = parameters.new_parameter( "pixels_x", comment="Number of pixels in x direction" ) pixels_y = parameters.new_parameter( "pixels_y", comment="Number of pixels in y direction" ) distance = parameters.new_parameter( "distance", comment="Sample to detector distance", unit="m" ) mpi_command = parameters.new_parameter( "mpi_command", comment="The mpi command to run pysingfel" ) allow_rewrite = parameters.new_parameter( "allow_rewrite", comment="Allow rewrite the ouput." ) calculate_Compton.value = False back_rotation.value = False slice_interval.value = 100 slice_index_upper.value = 1 pmi_start_ID.value = 1 pmi_stop_ID.value = 1 number_of_diffraction_patterns.value = 10 pixel_size.value = 0.001 pixels_x.value = 10 pixels_y.value = 5 distance.value = 0.13 mpi_command.value = "mpirun -n 2" allow_rewrite.value = False self.parameters = parameters def backengine(self): input_fn = self.get_input_fn() # input_dir = Path(input_fn).parent input_dir = self.__get_input_dir(input_fn) output_stem = str(Path(self.output_file_paths[0]).stem) output_dir = Path(self.output_file_paths[0]).parent / output_stem geom_file = self.get_geometry_file() # uniform_rotation = not self.parameters["random_rotation"].value uniform_rotation = self.parameters["uniform_rotation"].value calculate_Compton = self.parameters["calculate_Compton"].value slice_interval = self.parameters["slice_interval"].value number_of_slices = self.parameters["slice_index_upper"].value pmi_start_ID = self.parameters["pmi_start_ID"].value pmi_stop_ID = self.parameters["pmi_stop_ID"].value number_of_diffraction_patterns = self.parameters[ "number_of_diffraction_patterns" ].value mpi_command = self.parameters["mpi_command"].value python_command = str(sys.executable) # fmt: off if self.parameters["allow_rewrite"].value is True: output_dir.mkdir(parents=True, exist_ok=True) else: output_dir.mkdir(parents=True, exist_ok=False) err_info = "Cannot find the 'pysingfel' module, which is required to run" + "SingFELDiffractionCalculator.backengine(). Did you install it following the instruction: https://simex-lite.readthedocs.io/en/latest/backengines/pysingfel.html" try: import pysingfel except ModuleNotFoundError: raise ModuleNotFoundError(err_info) command_sequence = [python_command, pysingfel.__path__[0]+'/radiationDamageMPI.py', '--inputDir', str(input_dir), '--outputDir', str(output_dir), '--geomFile', str(geom_file), '--configFile', "/dev/null", '--backRotation', str(self.parameters["back_rotation"].value), '--uniformRotation', str(uniform_rotation), '--calculateCompton', str(calculate_Compton), '--sliceInterval', str(slice_interval), '--numSlices', str(number_of_slices), '--pmiStartID', str(pmi_start_ID), '--pmiEndID', str(pmi_stop_ID), '--numDP', str(number_of_diffraction_patterns), ] # fmt: on args = shlex.split(mpi_command) + command_sequence proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) # proc.wait() # The above one can be replaced by proc.communicate() output, err = proc.communicate() rc = proc.returncode if rc != 0: print(output.decode("ascii")) raise RuntimeError(err.decode("ascii")) saveH5(str(output_dir)) assert len(self.output_keys) == 1 key = self.output_keys[0] output_data = self.output[key] output_data.set_file(self.output_file_paths[0], SingFELFormat) return self.output def get_input_fn(self): """Make sure the data is a mapping of PMI file""" assert len(self.input) == 1 input_data = self.input.to_list()[0] if input_data.mapping_type == XMDYNFormat: input_fn = input_data.filename else: filepath = Path(self.base_dir) / "pmi_out_0000001.h5" input_data.write(str(filepath), XMDYNFormat) input_fn = str(filepath) return input_fn def get_geometry_file(self): simple_config = { "pixel_size": self.parameters["pixel_size"].value, "slow_pixels": self.parameters["pixels_y"].value, "fast_pixels": self.parameters["pixels_x"].value, } distance = self.parameters["distance"].value filename = str(Path(self.base_dir) / "singfel.geom") write_singfel_geom_file(filename, simple_config, distance) return filename def __get_input_dir(self, input_fn): """Create/get the input dir for PMI_input files""" input_path = Path(input_fn) dir_path = input_path.with_suffix("") if dir_path.exists() and dir_path.is_dir(): shutil.rmtree(dir_path) dir_path.mkdir() p = dir_path / "pmi_out_0000001.h5" p.symlink_to(input_path.resolve()) return str(dir_path)
def write_singfel_geom_file(file_name, simeple_config, distance): """Write the geom file for pysingfel""" # fmt: off panel_id_str = 0 serialization = ";panel %s\n" % (panel_id_str) serialization += "panel%s/min_fs = %d\n" % (panel_id_str, 0) serialization += "panel%s/max_fs = %d\n" % (panel_id_str, simeple_config["fast_pixels"]-1) serialization += "panel%s/min_ss = %d\n" % (panel_id_str, 0) serialization += "panel%s/max_ss = %d\n" % (panel_id_str, simeple_config["slow_pixels"]-1) # fmt: on # This is allowed to be float corner_x = -(simeple_config["fast_pixels"] - 1) / 2 corner_y = -(simeple_config["slow_pixels"] - 1) / 2 # fmt: off serialization += "panel%s/fs = %s\n" % (panel_id_str, "1.0x") serialization += "panel%s/ss = %s\n" % (panel_id_str, "1.0y") serialization += "panel%s/clen = %8.7e\n" % (panel_id_str, distance) serialization += "panel%s/res = %8.7e\n" % (panel_id_str, 1 / simeple_config["pixel_size"]) serialization += "panel%s/coffset = %8.7e\n" % (panel_id_str, 0) serialization += "panel%s/px = %d\n" % (panel_id_str, simeple_config["fast_pixels"]) serialization += "panel%s/py = %d\n" % (panel_id_str, simeple_config["slow_pixels"]) serialization += "panel%s/pix_width = %8.7e\n" % (panel_id_str, simeple_config["pixel_size"]) serialization += "panel%s/d = %8.7e\n" % (panel_id_str, distance) serialization += "panel%s/corner_x = %d\n" % (panel_id_str, corner_x) serialization += "panel%s/corner_y = %d\n" % (panel_id_str, corner_y) serialization += "\n" # fmt: on with open(file_name, "w") as fh: fh.write(serialization) def saveH5(path_to_files): """ Private method to save the object to a file. Creates links to h5 files that all contain only one pattern. :param output_path: The file where to save the object's data. :type output_path: string, default b """ # Setup new file. with h5py.File(path_to_files + ".h5", "w") as h5_outfile: # Files to read from. individual_files = [ os.path.join(path_to_files, f) for f in os.listdir(path_to_files) ] individual_files.sort() # Keep track of global parameters being linked. global_parameters = False # Loop over all individual files and link in the top level groups. for ind_file in individual_files: # Open file. with h5py.File(ind_file, "r") as h5_infile: # Links must be relative. relative_link_target = os.path.relpath( path=ind_file, start=os.path.dirname(os.path.dirname(ind_file)) ) # Link global parameters. if not global_parameters: global_parameters = True h5_outfile["params"] = h5py.ExternalLink( relative_link_target, "params" ) h5_outfile["info"] = h5py.ExternalLink(relative_link_target, "info") h5_outfile["misc"] = h5py.ExternalLink(relative_link_target, "misc") h5_outfile["version"] = h5py.ExternalLink( relative_link_target, "version" ) for key in h5_infile["data"]: # Link in the data. ds_path = "data/%s" % (key) h5_outfile[ds_path] = h5py.ExternalLink( relative_link_target, ds_path ) def get_solid_angle(geom_fn: str): """Get the solid angle array from a pysingfel geom file. Args: geom_fn (str): The geometry file in pysingfel format. Returns: ndarray: A 2D array of solid angles. """ if not PYSINGFEL_AVAILABLE: err_info = ( "Cannot find the 'pysingfel' module, which is required to run" + "SingFELDiffractionCalculator.backengine(). Did you install it following the instruction: https://simex-lite.readthedocs.io/en/latest/backengines/pysingfel.html" ) raise ModuleNotFoundError(err_info) det = Detector(geom_fn) solidAngle = np.zeros((det.py, det.px)) for ind_x in range(det.px): for ind_y in range(det.py): rx = (ind_x - det.cx) * det.pix_width ry = (ind_y - det.cy) * det.pix_height r = np.sqrt(rx**2 + ry**2) pixDist = np.sqrt(r**2 + det.d**2) ss = det.pix_width**2 / (4 * pixDist**2 + det.pix_width**2) solidAngle[ind_y, ind_x] = 4 * np.arcsin(ss) return solidAngle def get_qmap(geom_fn: str, PMI_file: str): """Get the reciprocal space magnitude map from pysingfel geom and PMI file. q=2sin(theta)/lambda Args: geom_fn (str): The geometry file in pysingfel format. PMI_file (str): The PMI file in XMDYN format. Returns: ndarray: A 2D array of qmap. """ if not PYSINGFEL_AVAILABLE: err_info = ( "Cannot find the 'pysingfel' module, which is required to run" + "SingFELDiffractionCalculator.backengine(). Did you install it following the instruction: https://simex-lite.readthedocs.io/en/latest/backengines/pysingfel.html" ) raise ModuleNotFoundError(err_info) det = Detector(geom_fn) beam = Beam(None) setEnergyFromFile(PMI_file, beam) print("Beam energy =", beam.photon_energy) det.init_dp(beam) return det.q_mod / 1e10