# Copyright (C) 2022 Juncheng E
# Contact: Juncheng E <juncheng.e@xfel.eu>
# This file is part of SimEx-Lite which is released under GNU General Public License v3.
# See file LICENSE or go to <http://www.gnu.org/licenses> for full license details.
from pathlib import Path
from subprocess import Popen, PIPE
import pkg_resources
import time
from libpyvinyl.BaseCalculator import BaseCalculator, CalculatorParameters
from libpyvinyl.BaseData import DataCollection
from SimExLite.PhotonBeamData import SimpleBeam
from SimExLite.DetectorData import DetectorData, CXIFormat
from SimExLite.SampleData import ASEFormat
from SimExLite.utils.Logger import setLogger
from .convert_sim_to_CXI import convert_to_CXI
logger = setLogger("CrystfelDiffractionCalculator")
[docs]class CrystfelDiffractionCalculator(BaseCalculator):
"""Diffraction pattern calculator with CrystFEL backend."""
[docs] def __init__(
self,
name: str,
input: DataCollection,
output_keys: str = "crystfel_diffraction",
output_data_types=DetectorData,
output_filenames: str = "diffr.cxi",
instrument_base_dir="./",
calculator_base_dir=None,
parameters=None,
):
super().__init__(
name,
input,
output_keys,
output_data_types=output_data_types,
output_filenames=output_filenames,
instrument_base_dir=instrument_base_dir,
calculator_base_dir=calculator_base_dir,
parameters=parameters,
)
def init_parameters(self):
parameters = CalculatorParameters()
geometry_fn = parameters.new_parameter(
"geometry_fn",
comment="The path of the geometry file in crystfel format (Cheetah style).",
)
number_of_diffraction_patterns = parameters.new_parameter(
"number_of_diffraction_patterns",
comment="The number of diffraction patterns to calculate",
)
number_of_diffraction_patterns.add_interval(
min_value=2, max_value=None, intervals_are_legal=True
)
random_rotation = parameters.new_parameter(
"random_rotation",
comment="If it's False, the orientation of the sample will not change."
+ " If it's True, it will be a random sampling in the SO3 space",
)
really_random = parameters.new_parameter(
"really_random",
comment="Seed the random number generator using the kernel random number generator (/dev/urandom)."
+ "This means that truly random numbers for the orientation and crystal size,"
+ "instead of the same sequence being used for each new run.",
)
beam_bandwidth = parameters.new_parameter(
"beam_bandwidth",
comment="The wavelength bandwidth, expressed as a decimal fraction applying to to wavelengths"
+ "Note: When using the two-colour or SASE spectrum, the spectrum calculation actually takes this value to be the bandwidth applying to the photon energies"
+ "instead of the wavelengths. For small bandwidths, the difference should be very small. Sorry for the horrifying inconsistency.",
)
photon_energy = parameters.new_parameter(
"photon_energy", comment="The mean photon energy", unit="eV"
)
pulse_energy = parameters.new_parameter(
"pulse_energy", comment="Total energy of the pulse", unit="joule"
)
beam_radius = parameters.new_parameter(
"beam_radius", comment="The radius of the X-ray beam", unit="meter"
)
spectrum = parameters.new_parameter(
"spectrum", comment="The spectrum of the X-ray beam"
)
spectrum.add_option(["tophat", "sase", "twocolour"], options_are_legal=True)
point_group = parameters.new_parameter(
"point_group",
comment="Use point group as the symmetry of the intensity list. E.g. '422' for Lysozyme.",
)
space_group = parameters.new_parameter(
"space_group",
comment="The space group used for sfs intensity calculation. E.g. 'P43212' for Lysozyme.",
)
intensities_resolution = parameters.new_parameter(
"intensities_resolution",
comment="The resolution used for sfs intensity calculation. The default value is 3 angstrom",
unit="angstrom",
)
intensities_fn = parameters.new_parameter(
"intensities_fn",
comment="The file name of the intensities and phases at the reciprocal lattice points. If 'None' is provided,"
+ "the intensities will be calculated by gen-sfs in the CCP4 tools with the pointgroup provided in the parameters.",
)
max_crystal_size = parameters.new_parameter(
"max_crystal_size", comment="The maximum crystal size.", unit="meter"
)
min_crystal_size = parameters.new_parameter(
"min_crystal_size", comment="The minimum crystal size.", unit="meter"
)
gaussian_background = parameters.new_parameter(
"gaussian_background",
comment="If the parameter is [100, 5], a background complying a Gaussian distribution with mean = 100 and standard deviation = 5 will be added to the patterns."
+ "The unit is eV",
)
gpu = parameters.new_parameter(
"gpu", comment="If the GPU is enabled for calculation."
)
random_rotation.value = True
really_random.value = True
number_of_diffraction_patterns.value = 2
beam_bandwidth.value = 0.01
photon_energy.value = 9.3e3
pulse_energy.value = 2e-3
gaussian_background.value = [0.0, 0.0]
# With the file, one doesn't need to provide the following parameters
intensities_fn.value = None
# SFS intensity calculation
intensities_resolution.value = 3.0
space_group.value = None
point_group.value = None
geometry_fn.value = None
beam_radius.value = 5e-6
max_crystal_size.value = 5e-8
min_crystal_size.value = 5e-8
gpu.value = False
spectrum.value = "tophat"
self.parameters = parameters
def backengine(self, is_convert_to_cxi: bool = True):
"""If `is_convert_to_cxi` is False, for debugging, will not convert the result to CXI."""
input_fn = self.get_input_fn()
output_fn = self.output_file_paths[0]
tmp_dir_path = Path(self.base_dir) / "diffr"
tmp_dir_path.mkdir(parents=True, exist_ok=True)
fn_prefix = "diffr_out"
tmp_output = str(tmp_dir_path / fn_prefix)
param = self.parameters
assert len(self.output_file_paths) == 1
assert param["point_group"].value is not None
geometry_fn = self.__get_geometry_file()
intensities_fn = self.__get_intensities_file(input_fn)
# These two noise settings are only for converting to CXI format
noise_base = param["gaussian_background"].value[0]
noise_std = param["gaussian_background"].value[1]
# fmt: off
command_sequence = ['pattern_sim',
'-p', input_fn,
'--geometry', geometry_fn,
'--output', tmp_output,
'--number', str(param["number_of_diffraction_patterns"].value),
'--beam-bandwidth', str(param["beam_bandwidth"].value),
'--nphotons', str(self.__get_n_photons()),
'--beam-radius', str(param["beam_radius"].value),
'--spectrum', param["spectrum"].value,
'--intensities', intensities_fn,
'--min-size', str(param["min_crystal_size"].value*1e9),
'--max-size', str(param["max_crystal_size"].value*1e9),
'-y', str(param["point_group"].value)
]
# fmt: on
if param["random_rotation"].value is True:
command_sequence += ["--random-orientation"]
if param["really_random"].value is True:
command_sequence += ["--really-random"]
if param["gpu"].value is True:
command_sequence += ["--gpu"]
# print(*command_sequence)
# # Executing:
try:
proc = Popen(command_sequence, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except FileNotFoundError as e:
if "pattern_sim" in str(e):
raise RuntimeError(
"pattern_sim not found, please install crystfel: https://www.desy.de/~twhite/crystfel/manual-pattern_sim.html"
)
else:
raise
while proc.poll() is None:
# Here the output of crystfel is in stderr
output = proc.stderr.readline()
if output:
logger.info(output.decode("ascii").strip())
# t0 = time.process_time()
output, err = proc.communicate()
# t1 = time.process_time()
# print("Time spent", t1 - t0)
rc = proc.returncode
if rc != 0:
print(output.decode("ascii"))
raise RuntimeError(err.decode("ascii"))
# Save in CXI format
vds_ref_geom = pkg_resources.resource_filename(
"SimExLite.DiffractionCalculators.convert_to_cxi_geoms",
"agipd_2120_vds.geom",
)
# This is needed to convert the CXI script, not relevent to the geom used for crystfel simulation.
sim_geom = pkg_resources.resource_filename(
"SimExLite.DiffractionCalculators.convert_to_cxi_geoms",
"detector_sim.geom",
)
# print(vds_ref_geom)
# print(sim_geom)
if is_convert_to_cxi:
logger.info(f'Writting in CXI format to "{output_fn}" ...')
convert_to_CXI(
sim_geom,
vds_ref_geom,
str(tmp_dir_path),
output_fn,
f"{fn_prefix}.(\\d+).h5",
noise_base,
noise_std,
)
assert len(self.output_keys) == 1
key = self.output_keys[0]
output_data = self.output[key]
output_data.set_file(self.output_file_paths[0], CXIFormat)
logger.info(f"Done")
return self.output
def get_input_fn(self):
"""Make sure the data is sample data"""
assert len(self.input) == 1
input_data = self.input.to_list()[0]
if input_data.mapping_type == ASEFormat:
if Path(input_data.filename).suffix.lower() == ".pdb":
# Keep in .pdb format
input_fn = input_data.filename
else:
# Convert to .pdb format
input_fn = str(Path(input_data.filename).with_suffix(".pdb"))
input_data.write(input_fn, ASEFormat)
else:
# If it's in dict format
filepath = Path(self.base_dir) / "sample.pdb"
input_data.write(str(filepath), ASEFormat)
input_fn = str(filepath)
return input_fn
def __get_n_photons(self):
photon_energy = self.parameters["photon_energy"].value
pulse_energy = self.parameters["pulse_energy"].value
beam = SimpleBeam(photon_energy=photon_energy, pulse_energy=pulse_energy)
return float(beam.get_photons_per_pulse())
def __get_geometry_file(self):
"""Get the geometry file for crystfel simulation."""
if self.parameters["geometry_fn"].value is None:
# geom_path = Path(diffcalc.__file__).with_name("agipd_simple_2d.geom")
geom_path = pkg_resources.resource_filename(
"SimExLite.DiffractionCalculators", "agipd_simple_2d.geom"
)
else:
geom_path = self.parameters["geometry_fn"].value
return geom_path
def __get_intensities_file(self, input_fn):
if self.parameters["intensities_fn"].value is None:
if self.parameters["space_group"].value is not None:
logger.info(
"Intensities list file was not provided, will calculate them with the space_group, point_group and intensities_resolution parameters..."
)
space_group = self.parameters["space_group"].value
point_group = self.parameters["point_group"].value
resolution = self.parameters["intensities_resolution"].value
input_fn = str(Path(input_fn).resolve())
command_sequence = [
"gen-sfs",
input_fn,
str(space_group),
str(resolution),
str(point_group),
]
# print(command_sequence)
run_dir = Path.cwd() / self.base_dir
# # Executing:
proc = Popen(
command_sequence,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
cwd=str(run_dir),
)
while proc.poll() is None:
output = proc.stdout.readline()
if output:
logger.info(output.decode("ascii").strip())
# t0 = time.process_time()
output, err = proc.communicate()
# t1 = time.process_time()
# print("Time spent", t1 - t0)
rc = proc.returncode
if rc != 0:
if "sfall: command not found" in err.decode("ascii"):
err_txt = err.decode("ascii")
err_txt += "\nsfall not found, please install CCP4 (https://www.ccp4.ac.uk/html/index.html)."
raise RuntimeError(err_txt)
else:
print(output.decode("ascii"))
raise RuntimeError(err.decode("ascii"))
logger.info(f"Save .hkl file in {input_fn}.hkl")
return input_fn + ".hkl"
else:
err = "Intensity information is not provided.\n"
err += 'Please set either parameters["intensities_fn"] or parameters["space_group"].'
raise KeyError(err)
elif self.parameters["space_group"].value is not None:
err = 'parameters["intensities_fn"] and parameters["space_group"] cannot be set at the same time, '
err += 'please set either "intensities_fn" or "space_group".'
raise KeyError(err)
else:
fn = self.parameters["intensities_fn"].value
return fn