Source code for SimExLite.PropagationCalculators.WPGPropagationCalculator

""":module WPGPropagationCalculator: Module that holds the WPGPropagationCalculator class."""

from pathlib import Path
import shutil
import sys

from libpyvinyl import BaseCalculator, CalculatorParameters
from libpyvinyl.BaseData import DataCollection
from SimExLite.WavefrontData import WavefrontData, WPGFormat
from SimExLite.utils.Logger import setLogger

logger = setLogger("WPGPropagationCalculator")

# WPG (https://github.com/samoylv/WPG) is neccessary to execute the calculator,
# but it's not a hard dependency of SimExLite.
try:
    from wpg.generators import build_gauss_wavefront
    from wpg import Wavefront
    from s2e.prop import propagate_s2e

    WPG_AVAILABLE = True
except ModuleNotFoundError:
    WPG_AVAILABLE = False


[docs]class WPGPropagationCalculator(BaseCalculator): """Class representing photon propagation through X-ray optics."""
[docs] def __init__( self, name: str, input: DataCollection, output_keys: str = "WPG_wavefront", output_data_types=WavefrontData, output_filenames: str = "wavefront.h5", instrument_base_dir="./", calculator_base_dir="WPGPropagationCalculator", parameters=None, ): """ Args: name (str): The name of this calculator. input (DataCollection): The input `DataCollection` of this calculator. output_keys (str, optional): The key(s) of this calculator's output data. It's a list of `str`s or a single str. Defaults to "WPG_wavefront". output_filenames (str, optional): The output filename of this calculator. Defaults to "wavefront.h5". instrument_base_dir (str, optional): The base directory for the instrument to which this calculator belongs. Defaults to "./". The final exact output file path depends on `instrument_base_dir` and `calculator_base_dir`: `instrument_base_dir`/`calculator_base_dir`/filename. calculator_base_dir (str, optional): The base directory for this calculator. Defaults to "./". The final exact output file path depends on `instrument_base_dir` and `calculator_base_dir`: `instrument_base_dir`/`calculator_base_dir`/filename. """ if not WPG_AVAILABLE: logger.warning('Cannot find the "WPG" module, which is required to run ' 'WPGPropagationCalculator.backengine(). Is it included in PYTHONPATH?' ) super().__init__( name, input, output_keys, output_data_types=output_data_types, output_filenames=output_filenames, instrument_base_dir=instrument_base_dir, calculator_base_dir=calculator_base_dir, parameters=parameters, )
def init_parameters(self): """Initialize the calculator parameters.""" parameters = CalculatorParameters() beamline_config = parameters.new_parameter( "beamline_config_file", comment="The beamline_configfile" ) self.parameters = parameters def prep_beamline_config(self): """Copy the beamline config file to the working dir to import the beamline module.""" beamline_config_fn = self.parameters["beamline_config_file"].value Path(self.base_dir).mkdir(parents=True, exist_ok=True) dst_path = Path(self.base_dir) / "WPG_beamline.py" shutil.copyfile(beamline_config_fn, str(dst_path)) def get_input_fn(self): """Make sure the data is a mapping of WPGFormat file.""" assert len(self.input) == 1 input_data = self.input.to_list()[0] if input_data.mapping_type == WPGFormat: input_fn = input_data.filename else: filepath = Path(self.base_dir) / "input_wavefront.h5" input_data.write(str(filepath), WPGFormat) input_fn = str(filepath) return input_fn def backengine(self)->DataCollection: """Run the simulation using WPG.""" # check for WPG first if not WPG_AVAILABLE: raise ModuleNotFoundError( 'Cannot find the "WPG" module, which is required to run ' "WPGPropagationCalculator.backengine(). Is it included in PYTHONPATH?" ) if self.parameters["beamline_config_file"].value is None: Path(self.base_dir).mkdir(parents=True, exist_ok=True) simple_beamline_fn = str(Path(self.base_dir) / "simple_beamline.py") self.parameters["beamline_config_file"].value = create_simple_beamline_file(simple_beamline_fn) self.prep_beamline_config() sys.path.insert(0, self.base_dir) import WPG_beamline input_fn = self.get_input_fn() output_fn = str(Path(self.base_dir) / self.output_filenames[0]) propagate_s2e.propagate(input_fn, output_fn, WPG_beamline.get_beamline) assert len(self.output_keys) == 1 key = self.output_keys[0] output_data = self.output[key] output_data.set_file(output_fn, WPGFormat) return self.output
def create_simple_beamline_file(filename: str): """Create a simple beamline file for the default setting""" strings = """def get_beamline(): distance = 300. foc_dist = 2. import wpg.optical_elements from wpg.optical_elements import Use_PP drift0 = wpg.optical_elements.Drift(distance) lens0 = wpg.optical_elements.Lens(foc_dist, foc_dist) drift1 = wpg.optical_elements.Drift(1./(1./foc_dist-1./distance)) bl0 = wpg.Beamline() bl0.append(drift0, Use_PP(semi_analytical_treatment=1, zoom=0.50, sampling=8)) bl0.append(lens0, Use_PP()) bl0.append(drift1, Use_PP(semi_analytical_treatment=1, zoom=4.2, sampling=0.5)) return bl0""" with open(filename, "w") as fh: fh.write(strings) return filename