Source code for SimulationFramework.Framework

"""
Simframe Framework Module

The main class for handling the tracking of a particle distribution through a lattice.

Settings files can be loaded in, consisting of one or more :ref:`MasterLattice` YAML files. This creates
:class:`~SimulationFramework.Framework_objects.frameworkLattice` objects, each of which contains
:class:`~SimulationFramework.Framework_objects.frameworkElement` objects.

These objects can be modified directly through the :class:`~SimulationFramework.Framework.Framework` class.

Based on the tracking code(s) provided to the framework, the particle distribution is tracked through the lattice
sequentially, and output beam distributions are generated and converted to the standard SimFrame HDF5 format.

Summary files containing Twiss parameters, and a summary of the beam files, are generated after tracking.

Classes:
    - :class:`~SimulationFramework.Framework.Framework`: Top-level class for loading and modifying lattice
    settings and tracking through them

    - :class:`~SimulationFramework.Framework.frameworkDirectory`: Class to load a tracking run from a directory
    and reading the Beam and Twiss files and making them available.
"""

import os
import sys
import yaml
import inspect
from typing import Any, Dict
from pprint import pprint
import numpy as np
from copy import deepcopy

from .Modules.merge_two_dicts import merge_two_dicts
from .Modules import Beams as rbf
from .Modules import Twiss as rtf
from .Modules import constants
from .Codes import Executables as exes
from .Codes.Generators.Generators import (
    ASTRAGenerator,
    GPTGenerator,
    generator_keywords,
    frameworkGenerator,
)
from .Framework_objects import runSetup, frameworkElement
from . import Framework_lattices as frameworkLattices
from . import Framework_elements as frameworkElements
from .Framework_Settings import FrameworkSettings
from .FrameworkHelperFunctions import (
    _rotation_matrix,
    clean_directory,
    convert_numpy_types,
)
from pydantic import (
    BaseModel,
    field_validator,
    ConfigDict,
)
from warnings import warn

try:
    import MasterLattice  # type: ignore

    if MasterLattice.__file__ is not None:
        MasterLatticeLocation = os.path.dirname(MasterLattice.__file__) + "/"
    else:
        MasterLatticeLocation = None
except ImportError:
    MasterLatticeLocation = None
try:
    import SimCodes  # type: ignore

    SimCodesLocation = os.path.dirname(SimCodes.__file__) + "/"
except ImportError:
    SimCodesLocation = None
try:
    import SimulationFramework.Modules.plotting.plotting as groupplot

    use_matplotlib = True
except ImportError as e:
    print("Import error - plotting disabled. Missing package:", e)
    use_matplotlib = False
from tqdm import tqdm

_mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG


[docs] def dict_representer(dumper, data): return dumper.represent_dict(iter(list(data.items())))
[docs] def dict_constructor(loader, node): return dict(loader.construct_pairs(node))
yaml.add_representer(dict, dict_representer) yaml.add_constructor(_mapping_tag, dict_constructor) latticeClasses = [ obj for name, obj in inspect.getmembers( sys.modules["SimulationFramework.Framework_lattices"] ) if inspect.isclass(obj) ] disallowed = [ "allowedkeywords", "conversion_rules_elegant", "conversion_rules_ocelot", "objectdefaults", "global_parameters", "objectname", "subelement", "beam", ] disallowed_changes = [ "allowedkeywords", "conversion_rules_elegant", "conversion_rules_ocelot", "objectdefaults", "global_parameters", "beam", "field_definition", "wakefield_definition", ] supported_codes = [code.split("Lattice")[0] for code in dir(frameworkLattices) if "lattice" in code.lower()]
[docs] class Framework(BaseModel): """ The main class for handling the tracking of a particle distribution through a lattice. Settings files can be loaded in, consisting of one or more :ref:`MasterLattice` YAML files. This creates :class:`~SimulationFramework.Framework_objects.frameworkLattice` objects, each of which contains :class:`~SimulationFramework.Framework_objects.frameworkElement` objects. These objects can be modified directly through the :class:`~SimulationFramework.Framework.Framework` class. Based on the tracking code(s) provided to the framework, the particle distribution is tracked through the lattice sequentially, and output beam distributions are generated and converted to the standard SimFrame HDF5 format. Summary files containing Twiss parameters, and a summary of the beam files, are generated after tracking. """ model_config = ConfigDict( extra="allow", arbitrary_types_allowed=True, validate_assignment=True, ) directory: str """The directory into which simulation files will be placed""" master_lattice: str | None = None """Location of the master lattice files. If the package is installed, this will be configured automatically""" simcodes: str | None = None """Location of the simulation codes directory. If the package is installed, this will be configured autonatically""" overwrite: bool | None = None """Flag to indicate whether existing files are to be overwritten #TODO deprecated?""" runname: str = "CLARA_240" """Name of the run for this setup #TODO deprecated?""" clean: bool = False """Flag to indicate whether all files in the existing directory are to be removed""" verbose: bool = True """Flag to indicate whether status updates should be printed during tracking""" sddsindex: int = 0 """Index for SDDS files""" delete_output_files: bool = False """Flag to indicate whether output files are to be deleted after tracking""" global_parameters: Dict = {} """Dictionary containing global parameters accessible to all classes""" elementObjects: Dict = {} """Dictionary containing all :class:`~SimulationFramework.Framework_objects.frameworkElement` objects""" latticeObjects: Dict = {} """Dictionary containing all :class:`~SimulationFramework.Framework_objects.frameworkLattice` objects""" commandObjects: Dict = {} """Dictionary containing all :class:`~SimulationFramework.Framework_objects.frameworkCommand` objects""" groupObjects: Dict = {} """Dictionary containing all :class:`~SimulationFramework.Framework_objects.frameworkGroup` objects""" fileSettings: Dict = {} """Dictionary containing all file settings""" globalSettings: Dict = {} """Dictionary containing all global settings""" generatorSettings: Dict = {} """Dictionary containing all generator settings""" original_elementObjects: Dict = {} """Dictionary containing all :class:`~SimulationFramework.Framework_objects.frameworkElement` objects before changes are made""" progress: int | float = 0 """Current progress of tracking""" tracking: bool = False """Flag to indicate whether the Framework is tracking""" basedirectory: str = "" """Current working directory""" filedirectory: str = "" """Directory for files""" generator: frameworkGenerator | None = None """The :class:`~SimulationFramework.Codes.Generators.Generators.frameworkGenerator` object""" settings: FrameworkSettings | None = None """Settings for the lattice""" settingsFilename: str | None = None """Filename containing lattice settings""" def model_post_init(self, __context): gptlicense = os.environ["GPTLICENSE"] if "GPTLICENSE" in os.environ else "" astra_use_wsl = os.environ["WSL_ASTRA"] if "WSL_ASTRA" in os.environ else 1 self.global_parameters = { "beam": rbf.beam(sddsindex=self.sddsindex), "GPTLICENSE": gptlicense, "delete_tracking_files": self.delete_output_files, "astra_use_wsl": astra_use_wsl, } self.setSubDirectory(self.directory) self.setMasterLatticeLocation(self.master_lattice) self.setSimCodesLocation(self.simcodes) self.executables = exes.Executables(self.global_parameters) self.defineASTRACommand = self.executables.define_astra_command self.defineElegantCommand = self.executables.define_elegant_command self.defineASTRAGeneratorCommand = ( self.executables.define_ASTRAgenerator_command ) self.defineCSRTrackCommand = self.executables.define_csrtrack_command self.define_gpt_command = self.executables.define_gpt_command # object encoding settings for simulations with multiple runs self.runSetup = runSetup() @field_validator("basedirectory", mode="before") @classmethod def validate_base_directory(cls, value: str) -> str: if len(value) > 0 and os.path.isdir(value): return value return os.getcwd() @field_validator("filedirectory", mode="before") @classmethod def validate_file_directory(cls, value: str) -> str: if len(value) > 0 and os.path.isdir(value): return value return os.path.dirname(os.path.abspath(__file__)) def __repr__(self) -> repr: return repr( { "master_lattice_location": self.global_parameters[ "master_lattice_location" ], "subdirectory": self.subdirectory, "settingsFilename": self.settingsFilename, } )
[docs] def clear(self) -> None: """ Clear out :attr:`~elementObjects`, :attr:`~latticeObjects`, :attr:`~commandObjects`, :attr:`~groupObjects` """ self.elementObjects = dict() self.latticeObjects = dict() self.commandObjects = dict() self.groupObjects = dict()
[docs] def change_subdirectory(self, *args, **kwargs) -> None: """ Change the subdirectory and `master_subdir` in :attr:`~global_parameters` to which lattice and beam files will be written. """ self.setSubDirectory(*args, **kwargs)
[docs] def setSubDirectory(self, direc: str) -> None: """ Change the subdirectory and `master_subdir` in :attr:`~global_parameters` to which lattice and beam files will be written. If :attr:`~clean`, then all existing files in the directory are removed. Parameters ---------- direc: str Directory to which files will be written. If directory does not exist, create it. """ self.subdirectory = os.path.abspath(direc) self.global_parameters["master_subdir"] = self.subdirectory if not os.path.exists(self.subdirectory): os.makedirs(self.subdirectory, exist_ok=True) else: if self.clean is True: clean_directory(self.subdirectory) if self.overwrite is None: self.overwrite = True
[docs] def setMasterLatticeLocation(self, master_lattice: str | None = None) -> None: """ Set the location of the :ref:`MasterLattice` package. This then also sets the `master_lattice_location` in :attr:`~global_parameters`. Parameters ---------- master_lattice: str The full path to the MasterLattice folder """ global MasterLatticeLocation if master_lattice is None: if MasterLatticeLocation is not None: self.global_parameters["master_lattice_location"] = ( MasterLatticeLocation.replace("\\", "/") ) if self.verbose: print( "Found MasterLattice Package =", self.global_parameters["master_lattice_location"], ) elif os.path.isdir( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../../MasterLattice/MasterLattice" ) + "/" ): self.global_parameters["master_lattice_location"] = ( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../../MasterLattice/MasterLattice" ) + "/" ).replace("\\", "/") if self.verbose: print( "Found MasterLattice Directory 2-up =", self.global_parameters["master_lattice_location"], ) elif os.path.isdir( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../MasterLattice/MasterLattice" ) + "/" ): self.global_parameters["master_lattice_location"] = ( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../MasterLattice/MasterLattice" ) + "/" ).replace("\\", "/") if self.verbose: print( "Found MasterLattice Directory 1-up =", self.global_parameters["master_lattice_location"], ) elif os.path.isdir( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../MasterLattice" ) + "/" ): self.global_parameters["master_lattice_location"] = ( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../MasterLattice" ) + "/" ).replace("\\", "/") if self.verbose: print( "Found MasterLattice Directory 1-up =", self.global_parameters["master_lattice_location"], ) else: if self.verbose: print( "Master Lattice not available - specify using master_lattice=<location>" ) self.global_parameters["master_lattice_location"] = "." else: self.global_parameters["master_lattice_location"] = os.path.join( os.path.abspath(master_lattice), "./" ) MasterLatticeLocation = self.global_parameters["master_lattice_location"]
[docs] def setSimCodesLocation(self, simcodes: str | None = None) -> None: """ Set the location of the :ref:`SimCodes` package. This then also sets the `simcodes_location` in :attr:`~global_parameters`. Parameters ---------- simcodes: str The full path to the SimCodes folder """ global SimCodesLocation if simcodes is None: if SimCodesLocation is not None: self.global_parameters["simcodes_location"] = SimCodesLocation.replace( "\\", "/" ) if self.verbose: print( "Found SimCodes Package =", self.global_parameters["simcodes_location"], ) elif os.path.isdir( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../../SimCodes/SimCodes" ) + "/" ): self.global_parameters["simcodes_location"] = ( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../../SimCodes/SimCodes" ) + "/" ).replace("\\", "/") if self.verbose: print( "Found SimCodes Directory 2-up =", self.global_parameters["simcodes_location"], ) elif os.path.isdir( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../SimCodes/SimCodes" ) + "/" ): self.global_parameters["simcodes_location"] = ( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../SimCodes/SimCodes" ) + "/" ).replace("\\", "/") if self.verbose: print( "Found SimCodes Directory 1-up =", self.global_parameters["simcodes_location"], ) elif os.path.isdir( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../SimCodes" ) + "/" ): self.global_parameters["simcodes_location"] = ( os.path.abspath( os.path.dirname(os.path.abspath(__file__)) + "/../SimCodes" ) + "/" ).replace("\\", "/") if self.verbose: print( "Found SimCodes Directory 1-up =", self.global_parameters["simcodes_location"], ) else: if self.verbose: print("SimCodes not available - specify using simcodes=<location>") self.global_parameters["simcodes_location"] = None else: self.global_parameters["simcodes_location"] = os.path.join( os.path.abspath(simcodes), "./" ) SimCodesLocation = self.global_parameters["simcodes_location"]
[docs] def load_Elements_File(self, inp: str | list | tuple | dict) -> None: """ Load a YAML file or list of YAML files with element definitions. The `elements` entry in this file(s) are then parsed and read into :attr:`~elementObjects` in order to build up the :class:`~SimulationFramework.Framework_objects.frameworkLattice` object. Parameters ---------- inp: str or list or tuple or dict Input file or dict containing the lattice `elements` """ if isinstance(inp, (list, tuple)): filename = inp else: filename = [inp] for f in filename: if os.path.isfile(f): with open(f, "r") as stream: elements = yaml.safe_load(stream)["elements"] elif os.path.isfile(self.subdirectory + "/" + f): with open(self.subdirectory + "/" + f, "r") as stream: elements = yaml.safe_load(stream)["elements"] else: with open( self.global_parameters["master_lattice_location"] + f, "r" ) as stream: elements = yaml.safe_load(stream)["elements"] for name, elem in list(elements.items()): self.read_Element(name, elem)
[docs] def loadSettings( self, filename: str | None = None, settings: FrameworkSettings | None = None, ) -> None: """ Load Lattice Settings from file or dictionary. These settings contain the lattice lines and their respective settings, YAML files and global parameters. Parameters ---------- filename: str or None Name of .def file containing lattice definitions settings: FrameworkSettings or None Settings for the lattice """ if isinstance(filename, str): self.settingsFilename = filename self.settings = FrameworkSettings() if os.path.isfile(filename): self.settings.loadSettings(filename) else: self.settings.loadSettings( self.global_parameters["master_lattice_location"] + filename ) elif isinstance(settings, FrameworkSettings): self.settingsFilename = settings.settingsFilename self.settings = settings self.globalSettings = self.settings["global"] if "generator" in self.settings and len(self.settings["generator"]) > 0: self.generatorSettings = self.settings["generator"] self.add_Generator(**self.generatorSettings) self.fileSettings = self.settings["files"] if "files" in self.settings else {} elements = self.settings["elements"] groups = ( self.settings["groups"] if "groups" in self.settings and self.settings["groups"] is not None else {} ) changes = ( self.settings["changes"] if "changes" in self.settings and self.settings["changes"] is not None else {} ) for name, elem in list(elements.items()): self.read_Element(name, elem) for name, elem in list(groups.items()): if "type" in elem: group = getattr(frameworkElements, elem["type"])( name, self, global_parameters=self.global_parameters, **elem ) self.groupObjects[name] = group for name, lattice in list(self.fileSettings.items()): self.read_Lattice(name, lattice) self.apply_changes(changes) self.original_elementObjects = {} for e in self.elementObjects: self.original_elementObjects[e] = deepcopy(self.elementObjects[e]) self.original_elementObjects["generator"] = deepcopy(self.generator)
[docs] def save_settings( self, filename: str | None = None, directory: str = ".", elements: dict | None = None, ) -> None: """ Save Lattice Settings to a file. Parameters ---------- filename: str or None Filename to which the settings will be saved; defaults to `settings.def` directory: str Directory to which the settings will be saved elements: dict or None Dictionary of :class:`~SimulationFramework.Framework_objects.frameworkElement` objects to save """ # if filename is None: # pre, ext = os.path.splitext(os.path.basename(self.settingsFilename)) # else: # pre, ext = os.path.splitext(os.path.basename(filename)) if filename is None: filename = "settings.def" settings = self.settings.copy() if elements is not None: settings["elements"] = elements settings = convert_numpy_types(settings) with open(directory + "/" + filename, "w") as yaml_file: yaml.default_flow_style = True yaml.safe_dump(settings, yaml_file, sort_keys=False)
[docs] def read_Lattice(self, name: str, lattice: dict) -> None: """ Create an instance of a <code>Lattice class; see :class:`~SimulationFramework.Framework_objects.frameworkLattice` and its child classes. This instance is then added to the :attr:`~latticeObjects` dictionary. Parameters ---------- name: str The name of the lattice line lattice: dict Dictionary containing settings for the lattice line """ if "code" not in lattice: raise KeyError(f"code must be provided for {lattice}") code = lattice["code"] if code.lower() not in supported_codes: raise NotImplementedError(f"code {code} is not supported") self.latticeObjects[name] = getattr( frameworkLattices, code.lower() + "Lattice" )( name=name, objectname=name, objecttype=code.lower() + "Lattice", file_block=lattice, elementObjects=self.elementObjects, groupObjects=self.groupObjects, runSettings=self.runSetup, settings=self.settings, executables=self.executables, global_parameters=self.global_parameters, )
[docs] def detect_changes( self, elementtype: str | None = None, elements: list | None = None, ) -> dict: """ Detect lattice changes from the original loaded lattice and return a dictionary of changes. Parameters ---------- elementtype: str or None Element type to check; check all if None elements: list or None Elements to check; check all if None Returns ------- dict Dictionary containing changes in the lattice, with element names and changed parameters """ changedict = {} if elementtype is not None: changeelements = self.getElementType(elementtype, "objectname") elif elements is not None: changeelements = elements else: changeelements = ["generator"] + list(self.elementObjects.keys()) if ( len(changeelements) > 0 and isinstance(changeelements[0], (list, tuple, dict)) and len(changeelements[0]) > 1 ): for ek in changeelements: new = None e, k = ek[:2] if e in self.elementObjects: new = self.elementObjects[e] elif e in self.groupObjects: new = self.groupObjects[e] if new is not None: if e not in changedict: changedict[e] = {} changedict[e][k] = convert_numpy_types(getattr(new, k)) else: for e in changeelements: element = None if e in self.elementObjects: element = self.elementObjects[e] elif e == "generator": element = self.generator cond = False orig = self.original_elementObjects[e] new = element if isinstance(element, frameworkElement): kval = [ k for k in new.model_fields_set if k not in disallowed_changes ] new_model_fields = { k: getattr(new, k) for k in new.model_fields_set if k not in disallowed_changes } orig_model_fields = { k: getattr(orig, k) for k in orig.model_fields_set if k not in disallowed_changes } for k in kval: if k not in list(orig_model_fields.keys()): cond = True elif new_model_fields[k] != orig_model_fields[k]: cond = True if cond: orig = self.original_elementObjects[e] new = element # try: changedict[e] = { k[0]: convert_numpy_types(getattr(new, k[0])) for k in new if k in orig and not getattr(new, k[0]) == getattr(orig, k[0]) and k[0] not in disallowed_changes } changedict[e].update( { k[0]: convert_numpy_types(getattr(new, k[0])) for k in new if k not in orig and k[0] not in disallowed_changes } ) if changedict[e] == {}: del changedict[e] # except Exception: # print("##### ERROR IN CHANGE ELEMS: ") # , e, new) # pass return changedict
[docs] def save_changes_file( self, filename: str | None = None, typ: str | None = None, elements: dict | None = None, dictionary: bool = False, ) -> dict | None: """ Save a file, or returns a dictionary, of detected changes in the lattice from the loaded version. Parameters ---------- filename: str or None Name of file containing changes; defaults to `changes.yaml` typ: str or None Element types to check; if `None`, check all elements: dict or None Dictionary containing elements and parameters to check; if `None`, check all dictionary: bool Flag to return changes as dictionary; if False, save a YAML file Returns ------- dict or None If `dictionary`, return a dict; otherwise, save a file and return `None` """ if filename is None: pre, ext = os.path.splitext(os.path.basename(self.settingsFilename)) filename = pre + "_changes.yaml" changedict = self.detect_changes(elementtype=typ, elements=elements) if dictionary: return changedict else: with open(filename, "w") as yaml_file: yaml.default_flow_style = True yaml.dump(changedict, yaml_file)
[docs] def save_lattice( self, lattice: str | None = None, filename: str | None = None, directory: str = ".", dictionary: bool = False, ) -> dict | None: """ Save lattice to a file, or return a dictionary containing the lattice elements Parameters ---------- lattice: str or None Name of lattice file; if `None`, sets to the name of the lattice filename: str or None Name of the file to be saved; if `None`, sets to the name of the lattice + '_lattice.yaml' directory: str Directory to which the file will be saved dictionary: bool Flag to save lattice as dictionary; if False, save a YAML file Returns ------- dict or None If `dictionary`, return a dict; otherwise, save a file and return `None` """ if filename is None: pre, ext = os.path.splitext(os.path.basename(self.settingsFilename)) else: pre, ext = os.path.splitext(os.path.basename(filename)) dic = dict({"elements": dict()}) latticedict = dic["elements"] if lattice is None: elements = list(self.elementObjects.keys()) filename = pre + ".yaml" else: if self.latticeObjects[lattice].elements is None: return elements = list(self.latticeObjects[lattice].elements.keys()) filename = pre + "_" + lattice + "_lattice.yaml" for e in elements: new = self.elementObjects[e] # try: if ( "subelement" in new and not new["subelement"] ) or "subelement" not in new: # latticedict.update({e: {}}) # for k in new: # if k not in disallowed: # latticedict[e][k[0].replace("object", "")] = convert_numpy_types(getattr(new, k[0])) latticedict[e] = { k[0].replace("object", ""): convert_numpy_types(getattr(new, k[0])) for k in new if k[0] not in disallowed and getattr(new, k[0]) is not None } if "sub_elements" in new: latticedict[e].update({"sub_elements": {}}) for subelem in new["sub_elements"]: newsub = self.elementObjects[subelem] latticedict[e]["sub_elements"][subelem] = { k[0].replace("object", ""): convert_numpy_types( getattr(newsub, k[0]) ) for k in newsub if k not in disallowed } # except Exception: # print("##### ERROR IN CHANGE ELEMS: ", e, new) # pass if dictionary: return latticedict else: with open(directory + "/" + filename, "w") as yaml_file: yaml.default_flow_style = True yaml.dump(dic, yaml_file)
[docs] def load_changes_file( self, filename: str | tuple | list | None = None, apply: bool = True, verbose: bool = False, ) -> dict | list: """ Loads a saved changes file and applies the settings to the current lattice. Returns a list of changes. See :func:`~apply_changes`. Parameters ---------- filename: str or list or tuple or None Changes filename to save; if `None`, base it on the settings filename apply: bool Flag to apply the changes verbose: bool Flag to print the changes applied Returns ------- dict or list If `filename` is a `list` or `tuple`, call this function again If `filename` is `None` or a `str`, return the dictionary of changes. """ if isinstance(filename, (tuple, list)): return [self.load_changes_file(c) for c in filename] else: if filename is None: pre, ext = os.path.splitext(os.path.basename(self.settingsFilename)) filename = pre + "_changes.yaml" with open(filename, "r") as infile: changes = dict(yaml.safe_load(infile)) if apply: self.apply_changes(changes, verbose=verbose) return changes
[docs] def apply_changes(self, changes: dict, verbose: bool = False) -> None: """ Applies a dictionary of changes to the current lattice. Parameters ---------- changes: dict Dictionary of changes to elements, keyed by element name and containing parameters and values to change verbose: bool Flag to indicate which elements are being modified """ for e, d in list(changes.items()): # print 'found change element = ', e if e in self.elementObjects: # print 'change element exists!' for k, v in list(d.items()): self.modifyElement(e, k, v) if verbose: print("modifying ", e, "[", k, "]", " = ", v) if e in self.groupObjects: # print ('change group exists!') for k, v in list(d.items()): self.groupObjects[e].change_Parameter(k, v) if verbose: print("modifying ", e, "[", k, "]", " = ", v)
[docs] def check_lattice(self, decimals: int = 4) -> bool: """ Checks that there are no positioning errors in the lattice. Parameters ---------- decimals: int Number of decimals to check errors Returns ------- bool True if errors are detected """ noerror = True for elem in self.elementObjects.values(): start = elem.position_start end = elem.end length = elem.length theta = elem.global_rotation[0] if elem.objecttype == "dipole" and abs(float(elem.angle)) > 0: angle = float(elem.angle) rho = length / angle clength = np.array([rho * (np.cos(angle) - 1), 0, rho * np.sin(angle)]) else: clength = np.array([0, 0, length]) cend = start + np.dot(clength, _rotation_matrix(theta)) if not np.round(cend - end, decimals=decimals).any() == 0: noerror = False print("check_lattice error:", elem.objectname, cend, end, cend - end) return noerror
[docs] def check_lattice_drifts(self, decimals: int = 4) -> bool: """ Checks that there are no positioning errors in the lattice. Parameters ---------- decimals: int Number of decimals to check errors Returns ------- bool True if errors are detected """ noerror = True for elem in self.elementObjects.values(): start = elem.position_start end = elem.position_end length = elem.length theta = elem.global_rotation[2] if elem.objecttype == "dipole" and abs(float(elem.angle)) > 0: angle = float(elem.angle) rho = length / angle clength = np.array([rho * (np.cos(angle) - 1), 0, rho * np.sin(angle)]) else: clength = np.array([0, 0, length]) cend = start + np.dot(clength, _rotation_matrix(theta)) if not np.round(cend - end, decimals=decimals).any() == 0: noerror = False print( "check_lattice_drifts error:", elem.objectname, cend, end, cend - end, ) return noerror
[docs] def change_Lattice_Code( self, latticename: str, code: str, exclude: str | list | tuple | None = None, ) -> None: """ Changes the tracking code for a given lattice. Parameters ---------- latticename: str Name of the lattice line defined in the :attr:`~latticeObjects` code: str Simulation code to use for `latticename`; can be `All` exclude: Exclude certain lines from this function """ if latticename == "All": [self.change_Lattice_Code(lo, code, exclude) for lo in self.latticeObjects] elif isinstance(latticename, (tuple, list)): [self.change_Lattice_Code(ln, code, exclude) for ln in latticename] else: if not latticename == "generator" and not ( latticename == exclude or (isinstance(exclude, (list, tuple)) and latticename in exclude) ): if code.lower() not in supported_codes: raise NotImplementedError(f"code {code} is not supported") # print('Changing lattice ', name, ' to ', code.lower()) currentLattice = self.latticeObjects[latticename] self.latticeObjects[latticename] = getattr( frameworkLattices, code.lower() + "Lattice" )( name=currentLattice.objectname, file_block=currentLattice.file_block, elementObjects=self.elementObjects, groupObjects=self.groupObjects, runSettings=self.runSetup, settings=self.settings, executables=self.executables, global_parameters=self.global_parameters, )
[docs] def read_Element( self, elementname: str, element: dict, subelement: bool = False, parent: str = None, ) -> None: """ Reads an element definition and creates the element and any sub-elements Parameters ---------- elementname: str Name of element element: dict Dictionary containins the elements subelement: bool Flag to indicate whether this element is a sub-element of another, i.e. a solenoid around an RF cavity parent: str Name of the parent element if this element is a subelement """ if elementname == "filename": self.load_Elements_File(element) else: if subelement: if "subelement" in element: del element["subelement"] if "parent" in element: del element["parent"] self.add_Element(elementname, subelement=True, parent=parent, **element) else: self.add_Element(elementname, **element) if "sub_elements" in element: for name, elem in list(element["sub_elements"].items()): self.read_Element(name, elem, subelement=True, parent=elementname)
[docs] def add_Element( self, name: str | None = None, typ: str | None = None, **kwargs, ) -> frameworkElement | None: """ Instantiates and adds the element definition to :attr:`~elementObjects` Parameters ---------- name: str Name of the element to add typ: str Type of element; see :ref:`framework-elements` Returns ------- :class:`~SimulationFramework.Framework_objects.frameworkElement` The newly created element Raises ------ NameError If the element does not have a `name` Exception In case of errors when generating the element """ if name is None: if "name" not in kwargs: raise NameError("Element does not have a name") else: name = kwargs["name"] # try: if typ is None: typ = kwargs["type"] # try: element = getattr(frameworkElements, typ)( objectname=name, objecttype=typ, global_parameters=self.global_parameters, **kwargs, ) element.update_field_definition() self.elementObjects[name] = element return element
# except Exception as ex: # print("add_Element error:", ex) # print("add_Element error:", typ, name, kwargs) # return # except Exception as e: # raise NameError('Element \'%s\' does not exist' % type)
[docs] def replace_Element( self, name: str | None = None, typ: str | None = None, **kwargs ) -> frameworkElement: """ Replaces an element type with a new type and updates the definitions Parameters ---------- name: str Name of the element to replace typ: str Type of element; see :ref:`framework-elements` Returns ------- :class:`~SimulationFramework.Framework_objects.frameworkElement` The replaced element Raises ------ NameError If the element does not have a `name` """ if name is None: if "name" not in kwargs: raise NameError("Element does not have a name") else: name = kwargs["name"] original_element = self.getElement(name) original_properties = { a: original_element[a] for a in original_element.objectproperties if a != "objectname" and a != "objecttype" } new_properties = merge_two_dicts(kwargs, original_properties) element = getattr(frameworkElements, typ)(name, typ, **new_properties) # print element self.elementObjects[name] = element return element
# except Exception as e: # raise NameError('Element \'%s\' does not exist' % type)
[docs] def getElement( self, element: str, param: str | None = None, ) -> dict | Any | frameworkElement: """ Returns the element object or a parameter of that element Parameters ---------- element: str Name of element to get param: str or None Parameter to retrieve; if `None`, return the entire element Returns ------- dict or Any or :class:`~SimulationFramework.Framework_objects.frameworkElement Get the `param` associated with `element`, or the entire element, or an empty dictionary if the element does not exist in the entire lattice """ if self.__getitem__(element) is not None: if param is not None: param = param.lower() try: return getattr(self.__getitem__(element), param) except AttributeError: warn(f"WARNING: Element {element} does not have parameter {param}; returning full element") return self.__getitem__(element) else: return self.__getitem__(element) else: warn(f"WARNING: Element {element} does not exist") return {}
[docs] def getElementType( self, typ: str | list | tuple, param: str | None = None, ) -> dict | list | Any: """ Gets all elements of the specified type, or the parameter of each of those elements Parameters ---------- typ: list or str or tuple Type or list of types to get param: str or None Parameters to retrieve; if `None`, get the entire object Returns ------- dict or list or Any Get `param` for all elements, or all elements, or recall this function recursively if `param` is a list or tuple """ if isinstance(typ, (list, tuple)): return [self.getElementType(t, param=param) for t in typ] if isinstance(param, (list, tuple)): return zip(*[self.getElementType(typ, param=p) for p in param]) # return [item for sublist in all_elements for item in sublist] return [ ( {"name": element, **self.elementObjects[element].model_dump()} if param is None else getattr(self.elementObjects[element], param) ) for element in list(self.elementObjects.keys()) if self.elementObjects[element].objecttype.lower() == typ.lower() ]
[docs] def setElementType( self, typ: str, setting: str, values: Any, ) -> None: """ Modifies the specified parameter of each element of a given type Parameters ---------- typ: str All elements of a given type to set setting: str Parameter in those elements to set values: Any Values to set on those elements Raises ------ ValueError If there is a mismatch between the length of `values` and the number of elements of that type """ elems = self.getElementType(typ) if len(elems) == len(values): for e, v in zip(elems, values): setattr(self[e["name"]], setting, v) else: raise ValueError
[docs] def modifyElement( self, elementName: str, parameter: str | list | dict, value: Any = None, ) -> None: """ Modifies an element parameter Parameters ---------- elementName: str Name of element to modify parameter: list or str or dict Parameter to modify value: Value to set on that element """ if isinstance(parameter, dict) and value is None: for p, v in parameter.items(): self.modifyElement(elementName, p, v) elif isinstance(parameter, list) and isinstance(value, list): if len(parameter) != len(value): warn("parameter and value must be of the same length") for p, v in zip(parameter, value): self.modifyElement(elementName, p, v) elif elementName in self.groupObjects: self.groupObjects[elementName].change_Parameter(parameter, value) elif elementName in self.elementObjects: setattr(self.elementObjects[elementName], parameter, value) else: warn("incorrect parameters passed to modifyElement")
[docs] def modifyElements( self, elementNames: str | list, parameter: str | list | dict, value: Any = None, ) -> None: """ Modifies parameters for multiple elements Parameters ---------- elementNames: str or list Name(s) of element to modify parameter: list or str or dict Parameter to modify value: Value to set on those elements """ if isinstance(elementNames, str): if elementNames.lower() == "all": elementNames = self.elementObjects.keys() else: elementNames = [elementNames] for elem in elementNames: self.modifyElement(elem, parameter, value)
[docs] def modifyElementType( self, elementType: str, parameter: str, value: Any, ) -> None: """ Modifies an element or a list of elements of a given type Parameters ---------- elementType: str Type of element to modify parameter: str Parameter of that element type to modify value: Any Value to set on that element(s) """ elems = self.getElementType(elementType) for elementName in [e["name"] for e in elems]: self.modifyElement(elementName, parameter, value)
[docs] def modifyLattice( self, latticeName: str, parameter: str | list | dict, value: Any = None, ) -> None: """ Modify a lattice definition, Parameters ---------- latticeName: str Name of lattice to modify parameter: str or list or dict Parameter(s) to update with their values value: Any Value to update """ if isinstance(parameter, dict) and value is None: for p, v in parameter.items(): self.modifyLattice(latticeName, p, v) elif isinstance(parameter, list) and isinstance(value, list): for p, v in parameter: self.modifyLattice(latticeName, p, v) elif latticeName in self.latticeObjects: setattr(self.latticeObjects[latticeName], parameter, value)
[docs] def modifyLattices( self, latticeNames: str | list, parameter: str | list | dict, value: Any = None, ) -> None: """ Modify a lattice definition for a list of lattices Parameters ---------- latticeNames: str or list Name of lattice(s) to modify parameter: str or list or dict Parameter(s) to update with their values value: Any Value to update """ if isinstance(latticeNames, str): if latticeNames.lower() == "all": latticeNames = self.latticeObjects.keys() else: latticeNames = [latticeNames] for latt in latticeNames: self.modifyLattice(latt, parameter, value)
[docs] def add_Generator( self, default: str | None = None, **kwargs, ) -> None: """ Add a file generator based on a keyword dictionary. Sets :attr:`~generator` to the :class:`~SimulationFramework.Codes.Generators.Generators.frameworkGenerator`. Also sets the "generator" in :attr:`~latticeObjects` to this generator. Parameters ---------- default: str or None Name of generator code """ if "code" in kwargs: if kwargs["code"].lower() == "gpt": code = GPTGenerator else: code = ASTRAGenerator else: code = ASTRAGenerator if default in generator_keywords["defaults"]: self.generator = code( self.executables, self.global_parameters, **merge_two_dicts(kwargs, generator_keywords["defaults"][default]), ) else: self.generator = code(self.executables, self.global_parameters, **kwargs) self.latticeObjects["generator"] = self.generator
[docs] def change_generator( self, generator: str, ) -> None: """ Changes the generator from one type to another. Parameters ---------- generator: str The generator code to which the generator object should be changed. """ old_kwargs = self.generator.kwargs if generator.lower() == "gpt": generator = GPTGenerator( self.executables, self.global_parameters, **old_kwargs ) else: if generator.lower() != "astra": warn(f"generator {generator} not supported; defaulting to ASTRA") generator = ASTRAGenerator( self.executables, self.global_parameters, **old_kwargs ) self.latticeObjects["generator"] = generator
[docs] def loadParametersFile(self, file): pass
[docs] def saveParametersFile(self, file: str, parameters: dict | list | tuple) -> None: """Saves a list of parameters to a file""" output = {} if isinstance(parameters, dict): try: output.update( { parameters["name"]: { k1: v1 for k1, v1 in parameters.items() if v1 not in disallowed } } ) except KeyError: warn("parameters dictionary must contain 'name' key") elif isinstance(parameters, (list, tuple)): try: for k in parameters: output.update({k["name"]: {}}) output[k["name"]].update( {subk: k[subk] for subk in k if subk not in disallowed} ) except TypeError: warn( "parameters must be a dictionary or a list of dictionaries containing a 'name' key" ) else: warn("could not parse parameters; they should be a dict, list or tuple") with open(file, "w") as yaml_file: yaml.default_flow_style = True yaml.dump(output, yaml_file)
[docs] def set_lattice_prefix( self, lattice: str, prefix: str, ) -> None: """ Sets the 'prefix' parameter for a lattice in :attr:`~latticeObjects`, which determines where it looks for its starting beam distribution. Parameters ---------- lattice: str Name of lattice prefix: str Lattice prefix """ if lattice in self.latticeObjects: self.latticeObjects[lattice].set_prefix(prefix) else: warn( f"{lattice} not found in latticeObjects; valid lattices are {list(self.latticeObjects.keys())}" )
[docs] def set_lattice_sample_interval( self, lattice: str, interval: int, ) -> None: """ Sets the 'sample_interval' parameter for a lattice, which determines the sampling of the distribution. See :attr:`~SimulationFramework.Framework_objects.frameworkLattice.sample_interval`. Parameters ---------- lattice: str Name of lattice interval: int Sampling interval in units of 2 ** (3 * interval) """ if lattice in self.latticeObjects: self.latticeObjects[lattice].sample_interval = interval else: warn( f"{lattice} not found in latticeObjects; valid lattices are {list(self.latticeObjects.keys())}" )
def __getitem__(self, key: str) -> Any: if key in list(self.elementObjects.keys()): return self.elementObjects.get(key) elif key in list(self.latticeObjects.keys()): return self.latticeObjects.get(key) elif key in list(self.groupObjects.keys()): return self.groupObjects.get(key) else: try: return getattr(self, key) except Exception: return None @property def elements(self) -> list: """ Returns a list of all element names from :attr:`~elementObjects` Returns ------- list List of element names """ return list(self.elementObjects.keys()) @property def groups(self) -> list: """ Returns a list of all group names from :attr:`~groupObjects` Returns ------- list List of group names """ return list(self.groupObjects.keys()) @property def lines(self) -> list: """ Returns a list of all lattice names Returns ------- list List of lattice names """ return list(self.latticeObjects.keys()) @property def lattices(self) -> list: """ Returns a list of all lattice names Returns ------- list List of lattice names """ return self.lines @property def commands(self) -> list: """ Returns a list of all command object names Returns ------- list List of command object names """ return list(self.commandObjects.keys())
[docs] def getSValues(self) -> list: """ Returns a list of S values for the current lattice from :attr:`~latticeObjects`; see :func:`~SimulationFramework.Framework_objects.frameworkLattice.getSValues`. Returns ------- list S values for all elements """ s0 = 0 allS = [] for lo in self.latticeObjects.values(): try: latticeS = [a + s0 for a in lo.getSValues()] allS = allS + latticeS s0 = allS[-1] except Exception: pass return allS
[docs] def getSValuesElements(self) -> list: """ Returns a list of (name, element, s) tuples for the current machine from :attr:`~latticeObjects`; see :func:`~SimulationFramework.Framework_objects.frameworkLattice.getSNamesElems`. Returns ------- list Element names, element object and its S position """ s0 = 0 allS = [] for lo in self.latticeObjects: if not lo == "generator": names, elems, svals = self.latticeObjects[lo].getSNamesElems() latticeS = [a + s0 for a in svals] selems = list(zip(names, elems, latticeS)) allS = allS + selems s0 = latticeS[-1] return allS
[docs] def getZValuesElements(self) -> list: """ Returns a list of (name, element, z) tuples for the current machine from :attr:`~latticeObjects`; see :func:`~SimulationFramework.Framework_objects.frameworkLattice.getZNamesElems`. Returns ------- list Element names, element object and its Z position """ allZ = [] for lo in self.latticeObjects: if not lo == "generator": names, elems, zvals = self.latticeObjects[lo].getZNamesElems() zelems = list(zip(names, elems, zvals)) allZ = allZ + zelems return list(sorted(allZ, key=lambda x: x[2][0]))
[docs] def track( self, files: list | None = None, startfile: str | None = None, endfile: str | None = None, preprocess: bool = True, write: bool = True, track: bool = True, postprocess: bool = True, save_summary: bool = True, frameworkDirec: bool = False, check_lattice: bool = True, ) -> Any | None: """ Tracks the current machine, or a subset based on the 'files' list. The lattice is checked (:func:`~check_lattice`) and saved (:func:`~save_lattice`), the settings file is saved (:func:`~save_settings`), and then each line in the lattice is tracked with the code specified. Parameters ---------- files: list or None List of files (lattice names) to track; if `None`, track all startfile: str or None Initial lattice name for tracking; if `None`, track all endfile: str or None Final lattice name for tracking; if `None`, track all preprocess: bool Call :func:`~SimulationFramework.Framework_objects.frameworkLattice.preProcess` before tracking each line write: bool Write each lattice file track: bool Track each lattice postprocess: bool Call :func:`~SimulationFramework.Framework_objects.frameworkLattice.postProcess` after tracking each line save_summary: bool Save beam and Twiss summary files frameworkDirec: bool If True, return a :class:`~SimulationFramework.Framework.frameworkDirectory` object check_lattice: bool Call :func:`~check_lattice` before tracking Returns ------- :class:`~SimulationFramework.Framework.frameworkDirectory` or None Framework directory object if `frameworkDirec` is True """ if check_lattice: if not self.check_lattice(): raise Exception("Lattice Error - check definitions") self.save_lattice(directory=self.subdirectory, filename="lattice.yaml") self.save_settings( directory=self.subdirectory, filename="settings.def", elements={"filename": "lattice.yaml"}, ) self.tracking = True self.progress = 0 if files is None: files = ( ["generator"] + self.lines if not hasattr(self, "generator") else self.lines ) if startfile is not None and startfile in files: index = files.index(startfile) files = files[index:] if endfile is not None and endfile in files: index = files.index(endfile) files = files[: index + 1] if self.verbose: pbar = tqdm(total=len(files) * 4) percentage_step = 100 / len(files) for i in range(len(files)): base_percentage = 100 * (i / len(files)) lattice_name = files[i] self.progress = base_percentage if lattice_name == "generator" and hasattr(self, "generator"): latt = self.generator base_description = "Generator[" + self.generator.code + "]" else: latt = self.latticeObjects[lattice_name] base_description = lattice_name + "[" + latt.code + "]" if self.verbose: pbar.set_description(base_description + ": ") # noqa E701 if preprocess and lattice_name != "generator": if self.verbose: pbar.set_description( base_description + ": pre-process " ) # noqa E701 latt.preProcess() self.progress = base_percentage + 0.25 * percentage_step if self.verbose: pbar.update() # noqa E701 if write: if self.verbose: pbar.set_description( base_description + ": write " ) # noqa E701 latt.write() self.progress = base_percentage + 0.5 * percentage_step if self.verbose: pbar.update() # noqa E701 if track: if self.verbose: pbar.set_description( base_description + ": track " ) # noqa E701 latt.run() self.progress = base_percentage + 0.75 * percentage_step if self.verbose: pbar.update() # noqa E701 if postprocess: if self.verbose: pbar.set_description( base_description + ": post-process " ) # noqa E701 latt.postProcess() self.progress = base_percentage + 1 * percentage_step if self.verbose: pbar.update() # noqa E701 if self.verbose: pbar.set_description(base_description + ": Finished! ") pbar.close() if save_summary: self.save_summary_files() self.tracking = False if frameworkDirec: return frameworkDirectory( directory=self.subdirectory, twiss=True, beams=True, verbose=self.verbose, )
[docs] def postProcess( self, files: list | None = None, startfile: str | None = None, endfile: str | None = None, ) -> None: """ Post-processes the tracking files and converts them to HDF5. See :func:`~SimulationFramework.Framework_objects.frameworkLattice.postProcess` and the same function in the child classes for specific codes. Parameters ---------- files: list or None List of lattice names; if `None`, process all startfile: str or None Starting lattice object; if `None`, process from the start endfile: str or None End lattice object; if `None`, process to the end """ if files is None: files = ( ["generator"] + self.lines if not hasattr(self, "generator") else self.lines ) if startfile is not None and startfile in files: index = files.index(startfile) files = files[index:] if endfile is not None and endfile in files: index = files.index(endfile) files = files[: index + 1] for i in range(len(files)): latt = files[i] if latt == "generator" and hasattr(self, "generator"): self.generator.postProcess() else: self.latticeObjects[latt].postProcess()
[docs] def save_summary_files(self, twiss: bool = True, beams: bool = True) -> None: """ Saves HDF5 summary files for the Twiss and/or Beam files using :func:`~SimulationFramework.Modules.Twiss.load_directory` and :func:`~SimulationFramework.Modules.Beams.save_HDF5_summary_file` Parameters ---------- twiss: bool If True, save `Twiss_Summary.hdf5` in :attr:`~subdirectory` beams: bool If True, save `Beam_Summary.hdf5` in :attr:`~subdirectory` """ if twiss: t = rtf.load_directory(self.subdirectory) t.save_HDF5_twiss_file(self.subdirectory + "/" + "Twiss_Summary.hdf5") if beams: rbf.save_HDF5_summary_file( self.subdirectory, self.subdirectory + "/" + "Beam_Summary.hdf5" )
[docs] def pushRunSettings(self) -> None: """ Updates the 'Run Settings' in each of the lattices """ for ln, latticeObject in self.latticeObjects.items(): if isinstance(latticeObject, tuple(latticeClasses)): latticeObject.updateRunSettings(self.runSetup)
[docs] def setNRuns(self, nruns: int) -> None: """ Sets the number of simulation runs to a new value for all lattice objects. See :func:`~SimulationFramework.Framework.runSetup.setNRuns`. Parameters ---------- nruns: int Number of runs to set up """ self.runSetup.setNRuns(nruns) self.pushRunSettings()
[docs] def setSeedValue(self, seed: int) -> None: """ Sets the random number seed to a new value for all lattice objects See :func:`~SimulationFramework.Framework.runSetup.setSeedValue`. Parameters ---------- seed: int Random number seed """ self.runSetup.setSeedValue(seed) self.pushRunSettings()
[docs] def loadElementErrors(self, file: str) -> None: """ Load element errors file; see :func:`~SimulationFramework.Framework.runSetup.loadElementErrors` Parameters ---------- file: str Errors file """ self.runSetup.loadElementErrors(file) self.pushRunSettings()
[docs] def setElementScan( self, name: str, item: str, scanrange: list, multiplicative: bool = False, ) -> None: """ Define a parameter scan for a single parameter of a given machine element. See :class:`~SimulationFramework.Framework.runSetup.setElementScan` Parameters ---------- name: str Name of element to scan item: str Parameter of that element to scan scanrange: list List of values to set multiplicative: bool Flag to indicate whether settings are multiplicative or additive with respect to the original value """ self.runSetup.setElementScan( name=name, item=item, scanrange=scanrange, multiplicative=multiplicative ) self.pushRunSettings()
def _addLists(self, list1: list, list2: list) -> list: """Adds elements piecewise in two lists""" return [a + b for a, b in zip(list1, list2)]
[docs] def offsetElements( self, x: int | float = 0, y: int | float = 0, z: int | float = 0 ) -> None: """ Moves all elements by the set amount in (x, y, z) space. Updates :attr:`~elementObjects`. Parameters ---------- x: int | float x offset y: int | float y offset z: int | float z offset """ offset = [x, y, z] for latt in self.lines: if ( self.latticeObjects[latt].file_block is not None and "output" in self.latticeObjects[latt].file_block and "zstart" in self.latticeObjects[latt].file_block["output"] ): self.latticeObjects[latt].file_block["output"]["zstart"] += z for elem in self.elements: self.elementObjects[elem].centre = self._addLists( self.elementObjects[elem].centre, offset )
# self.elementObjects[elem].centre = self._addLists( # self.elementObjects[elem].centre, offset # )
[docs] class frameworkDirectory(BaseModel): """ Class to load a tracking run from a directory and read the Beam and Twiss files and make them available """ directory: str | None = None """Directory from which to load beam and Twiss files""" twiss: bool | rtf.twiss = True """Flag to indicate whether to load Twiss files""" beams: bool | rbf.beamGroup = False """Flag to indicate whether to load beam files""" verbose: bool = False """Flag to print status updates""" settings: str = "settings.def" """Framework settings filename""" changes: str = "changes.yaml" """Lattice changes filename""" rest_mass: float | None = None """Particle rest mass""" framework: Framework | None = None """:class:`~SimulationFramework.Framework.Framework` instance""" def __init__( self, *args, **kwargs, ) -> None: super(frameworkDirectory, self).__init__( *args, **kwargs, ) if not isinstance(self.framework, Framework): directory = ( "." if self.directory is None else os.path.abspath(self.directory) ) self.framework = Framework(**kwargs) self.framework.loadSettings(directory + "/" + self.settings) else: self.framework = self.framework if self.directory is None: directory = os.path.abspath(self.framework.subdirectory) else: directory = self.directory if os.path.exists(directory + "/" + self.changes): self.framework.load_changes_file(directory + "/" + self.changes) if self.beams: self.beams = rbf.load_HDF5_summary_file( os.path.join(directory, "Beam_Summary.hdf5") ) if len(self.beams) < 1: print("No Summary File! Globbing...") self.beams = rbf.load_directory(directory) if self.rest_mass is None: if len(self.beams.param("particle_rest_energy")) > 0: rest_mass = self.beams.param("particle_rest_energy")[0][0] else: rest_mass = constants.m_e else: rest_mass = self.rest_mass self.twiss = rtf.twiss(rest_mass=rest_mass) else: self.beams = None self.twiss = rtf.twiss() if self.twiss: self.twiss.load_directory(directory, verbose=self.verbose) if use_matplotlib:
[docs] def plot(self, *args, **kwargs): """ Return a plot object; see :func:`~SimulationFramework.Modules.plotting.plotting.plot`. """ return groupplot.plot(self, *args, **kwargs)
[docs] def general_plot(self, *args, **kwargs): """ Return a general_plot object; see :func:`~SimulationFramework.Modules.plotting.plotting.general_plot`. """ return groupplot.general_plot(self, *args, **kwargs)
def __repr__(self): return repr( {"framework": self.framework, "twiss": self.twiss, "beams": self.beams} )
[docs] def save_summary_files(self, twiss: bool = True, beams: bool = True): """ Save summary files in framework directory; see :func:`~SimulationFramework.Framework.Framework.save_summary_files`. Parameters ---------- twiss: bool If True, save `Twiss_Summary.hdf5` in :attr:`~directory` beams: bool If True, save `Beam_Summary.hdf5` in :attr:`~directory` """ self.framework.save_summary_files(twiss=twiss, beams=beams)
[docs] def getScreen(self, screen: str) -> rbf.beam | None: """ Get a beam object for the given screen; see :func:`~SimulationFramework.Modules.Beams.beamGroup.getScreen` Parameters ---------- screen: str Name of screen Returns ------- :class:`~SimulationFramework.Modules.Beams.beam` The beam object from `screen` Raises ------ ValueError If `beams` is not a :class:`~SimulationFramework.Modules.Beams.beamGroup` object """ if isinstance(self.beams, rbf.beamGroup): return self.beams.getScreen(screen) else: raise ValueError("Beam files have not been read in")
[docs] def getScreenNames(self) -> dict: """ Get beam objects from all screens Returns ------- Dict The :class:`~SimulationFramework.Modules.Beams.beam` objects from the screen keyed by name Raises ------ ValueError If `beams` is not a :class:`~SimulationFramework.Modules.Beams.beamGroup` object """ if isinstance(self.beams, rbf.beamGroup): return self.beams.getScreens() else: raise ValueError("Beam files have not been read in")
[docs] def element(self, element: str, field: str | None = None) -> Any | frameworkElement: """ Get an element definition from the framework object. Parameters ---------- element: str Element to retrieve field: str | None Field of that element to retrieve Returns ------- Any or frameworkElement Get the `field` of `element`, or the entire :class:`~SimulationFramework.Framework_objects.frameworkElement` if not `field` """ elem = self.framework.getElement(element) if field: try: return getattr(elem, field) except AttributeError: warn(f"{elem} does not have field {field}; returning entire element") return elem else: pprint( { k.replace("object", ""): v for k, v in elem.items() if k not in disallowed } ) return elem
[docs] def load_directory( directory: str = ".", twiss: bool = True, beams: bool = False, **kwargs ) -> frameworkDirectory: """Load a directory from a SimFrame tracking run and return a frameworkDirectory object""" fw = frameworkDirectory(directory=directory, twiss=twiss, beams=beams, **kwargs) return fw