Source code for SMITER

import SMITER_ORB__POA
import SALOME_ComponentPy
import SALOME_Embedded_NamingService_ClientPy
import SALOME_DriverPy
import smardda_parameters
import re
import math
import os
import numpy as np

import SALOMEDS

from SMITER_utils import (findOrCreateComponent, objectID, moduleName, caseID,
                          geoqID, hdsgenID, powcalID, eqdskID, magtfmID,
                          unknownID, getStudy, verbose)
import json

from omniORB import CORBA
from salome.kernel.logger import Logger
from salome.kernel import termcolor
logger = Logger('SMITER', color=termcolor.RED_FG)

if verbose():
    logger.showDebug()

myORB = CORBA.ORB_init([''], CORBA.ORB_ID)

# Map SObject.
__entryToIORMap__ = {}

def ObjectToString(object):
    """Get IOR for object
    """
    return myORB.object_to_string(object)


def StringToObject(string):
    """Get object from IOR string
    """
    return myORB.string_to_object(string)

"""Because in python2 json.load and json.loads create unicode objects and not
string object, the following example has been took from
https://stackoverflow.com/questions/956867/how-to-get-string-objects-instead\
-of-unicode-from-json/33571117#33571117, which returns string objects instead
of unicode objects
"""

def json_loads(json_text):
    return _byteify(
        json.loads(json_text, object_hook=_byteify),
        ignore_dicts=True)

def _byteify(data, ignore_dicts = False):
    # if this is a unicode string, return its string representation

    # The following is not needed in Python >3
    # if isinstance(data, unicode):
    #     return data.encode('utf-8')
    # if this is a list of values, return list of byteified values
    if isinstance(data, list):
        return [ _byteify(item, ignore_dicts=True) for item in data ]
    # if this is a dictionary, return dictionary of byteified keys and values
    # but only if we haven't already byteified it
    if isinstance(data, dict) and not ignore_dicts:
        return {
            _byteify(key, ignore_dicts=True): _byteify(value, ignore_dicts=True)
            for key, value in data.items()
        }
    # if it's anything else, return it in its original form
    return data

class SMITER(SMITER_ORB__POA.SMITER_Gen,
             SALOME_ComponentPy.SALOME_ComponentPy_i,
             SALOME_DriverPy.SALOME_DriverPy_i):
    """Construct an instance of SMITER module engine.
    The class SMITER implements CORBA interface SMITER_Gen (see
    SMITER_Gen.idl). It is inherited from the classes SALOME_ComponentPy_i
    (implementation of Engines::EngineComponent CORBA interface - SALOME
    component) and SALOME_DriverPy_i (implementation of SALOMEDS::Driver CORBA
    interface - SALOME module's engine).
    """

    def __init__(self, orb, poa, contID, containerName, instanceName,
                 interfaceName):
        SALOME_ComponentPy.SALOME_ComponentPy_i.__init__(self, orb, poa,
                    contID, containerName, instanceName, interfaceName, False)
        SALOME_DriverPy.SALOME_DriverPy_i.__init__(self, interfaceName)

        #
        emb_ns = self._contId.get_embedded_NS_if_ssl()
        import CORBA
        if CORBA.is_nil(emb_ns):
            self._naming_service = SALOME_ComponentPy.SALOME_NamingServicePy_i( self._orb )
        else:
            self._naming_service = SALOME_Embedded_NamingService_ClientPy.SALOME_Embedded_NamingService_ClientPy(emb_ns)
        pass

        logger.debug('SMITER engine instance created')

    def getVersion(self):
        """Get version information.
        """
        import salome_version
        return salome_version.getVersion("SMITER", True)

    def makeBanner(self, name):
        """Generate hello banner.
        """
        banner = "Hello %s!" % name
        return banner

    def raiseAnException(self):
        """Intentionnally raises an exception for test purposes.
        """
        import SALOME
        exData = SALOME.ExceptionStruct(SALOME.BAD_PARAM, "Test exception in"
                                        " raiseAnException()", '', 0)
        raise SALOME.SALOME_Exception(exData)

    def createObject(self, name):
        """Create object.
        """
        study = getStudy()
        builder = study.NewBuilder()
        father = findOrCreateComponent()
        object = builder.NewObject(father)
        attr = builder.FindOrCreateAttribute(object, "AttributeName")
        attr.SetValue(name)
        attr = builder.FindOrCreateAttribute(object, "AttributeLocalID")
        attr.SetValue(objectID())
        # self.PublishInStudy(study, object, geoq.GEOQ(name)._this(), name)

    def CanPublishInStudy(self, IOR):
        """Enable publishing of objects in study.

        Arguments:
            IOR (string): Check objects type.
        """
        return True

    def publishCase(self, study, case):
        """Publishes study to the study object browser
        """
        # First publish the case object
        caseName = case.getName()
        caseSObj = self.PublishInStudy(study, None, case, caseName)
        # Now we publish the rest of the objects
        wall = case.getWall()
        if wall is None:
            pass
        else:
            wallSObj = self.PublishInStudy(study, caseSObj, wall,
                                           wall.getName())
            case.setWallEntry(wallSObj.GetID())

        target = case.getTarget()
        targetSObj = self.PublishInStudy(study, caseSObj, target,
                                         target.getName())
        case.setTargetEntry(targetSObj.GetID())

        shadow = case.getShadow()
        shadowSObj = self.PublishInStudy(study, caseSObj, shadow,
                                         shadow.getName())
        case.setShadowEntry(shadowSObj.GetID())

        hdsgen = case.getHDSGEN()
        hdsgenSObj = self.PublishInStudy(study, caseSObj, hdsgen,
                                         hdsgen.getName())
        case.setHdsgenEntry(hdsgenSObj.GetID())

        powcal = case.getPOWCAL()
        powcalSObj = self.PublishInStudy(study, caseSObj, powcal,
                                         powcal.getName())
        case.setPowcalEntry(powcalSObj.GetID())

        magtfm = case.getMAGTFM()
        if magtfm is not None:
            magtfmSObj = self.PublishInStudy(study, caseSObj, magtfm,
                                             magtfm.getName())
            case.setMAGTFMEntry(magtfmSObj.GetID())

        eqdsk = case.getEQDSK()
        eqdskName = eqdsk.getName().rsplit('/', 1)[-1] # Basename
        eqdskSObj = self.PublishInStudy(study, caseSObj, eqdsk, eqdskName)
        case.setEqdskEntry(eqdskSObj.GetID())

    def PublishInStudy(self, study, sobject, object, name):
        """Publishes object to study
        """

        if not object:
            # Nothing to do
            return sobject

        builder = study.NewBuilder()

        # Initiate command transaction
        builder.NewCommand()

        if not sobject:
            father = findOrCreateComponent()
            newSobject = builder.NewObject(father)
        else:
            # Create a child object under SObject
            newSobject = builder.NewObject(sobject)


        attr = builder.FindOrCreateAttribute(newSobject, 'AttributeName')
        if not name:
            name = object.getName()
        attr.SetValue(name)
        attr = builder.FindOrCreateAttribute(newSobject, 'AttributeIOR')
        attr.SetValue(ObjectToString(object))

        attr = builder.FindOrCreateAttribute(newSobject, 'AttributeLocalID')
        pixmap = builder.FindOrCreateAttribute(newSobject, 'AttributePixMap')
        if object._narrow(CASE):
            attr.SetValue(caseID())
            pixmap.SetPixMap('case_right_icon.png')
        elif object._narrow(GEOQ):
            attr.SetValue(geoqID())
            pixmap.SetPixMap('shadow_icon.png')
        elif object._narrow(HDSGEN):
            attr.SetValue(hdsgenID())
            pixmap.SetPixMap('hdsgen_icon.png')
        elif object._narrow(POWCAL):
            attr.SetValue(powcalID())
            pixmap.SetPixMap('powcal_icon.png')
        elif object._narrow(EQDSK):
            attr.SetValue(eqdskID())
            pixmap.SetPixMap('eqdsk_icon.png')
        elif object._narrow(MAGTFM):
            attr.SetValue(magtfmID())
            pixmap.SetPixMap('magtfm_icon.png')
        else:
            attr.SetValue(unknownID())
        # End and start the transaction
        builder.CommitCommand()
        return newSobject

    def Save(self, component, URL, isMultiFile):
        """Saves data in the form of a dictionary which is then transformed
        into JSON.
        """
        logger.debug('SMITER.Save Started')
        data2Save = {}
        study = getStudy()
        itr = study.NewChildIterator(component)
        itr.InitEx(1)
        while itr.More():
            so = itr.Value()
            soId = so.GetID()
            itr.Next()
            obj = so.GetObject()
            if obj:
                if obj._narrow(SMITER_ORB__POA.GEOQ):
                    TYPE = 'Geoq'
                elif obj._narrow(SMITER_ORB__POA.HDSGEN):
                    TYPE = 'Hdsgen'
                elif obj._narrow(SMITER_ORB__POA.POWCAL):
                    TYPE = 'Powcal'
                elif obj._narrow(SMITER_ORB__POA.EQDSK):
                    TYPE = 'Eqdsk'
                elif obj._narrow(SMITER_ORB__POA.CASE):
                    TYPE = 'Case'
                elif obj._narrow(SMITER_ORB__POA.MAGTFM):
                    TYPE = 'Magtfm'
                else:
                    continue
                if TYPE in 'GeoqHdsgenPowcalMagtfm':
                    if obj.modified():
                        paramList = obj.returnListOfChangedVariables()
                        data = {key: obj.getVariable(key) for key in paramList}
                    else:
                        data = {}

                    if TYPE == 'Geoq':
                        data['geoqType'] = obj.getType()
                        data['meshEntry'] = obj.getMeshEntry()
                    if TYPE == 'Magtfm':
                        data['mag.txt'] = obj.getMAG()

                elif TYPE == 'Eqdsk':
                    # Object is Eqdsk which is different
                    data = obj.generateText()
                elif TYPE == 'Case':
                    # Gather the necessary entry regarding child objects
                    entries = [obj.getWallEntry(), obj.getTargetEntry(),
                            obj.getShadowEntry(), obj.getHDSGENEntry(),
                            obj.getPOWCALEntry(), obj.getEQDSKEntry(),
                            obj.getMAGTFMEntry()]
                    data = [entries]
                else:
                    continue
                data2Save[soId] = {
                    'type': TYPE,
                    'name': obj.getName(),
                    'data': data
                }
        stream = json.dumps(data2Save)
        logger.debug('SMITER.Save Finished')
        return stream.encode()

    def Load(self, component, stream, theURL, isMultiFile):
        logger.debug('SMITER.Load Started')
        """Reads the dictionary we saved
        """
        __entryToIORMap__.clear()

        if isMultiFile:
            pass

        try:
            unpack = json_loads(stream.decode())
        except Exception as e:
            return False

        # First traverse and get the case objects
        cases = []
        for ID in unpack:
            TYPE = unpack[ID]['type']
            if TYPE == 'Case':
                obj = CASE()
                obj.setName(unpack[ID]['name'])
                if isinstance(unpack[ID]['data'][-1], dict):
                    results = unpack[ID]['data'][1]
                    if results:
                        for result in results:
                            obj.saveResult(result, results[result])
                    cases.append((obj, unpack[ID]['data'][0]))
                else:
                    cases.append((obj, unpack[ID]['data'][0]))
                pointer = obj._this()
                __entryToIORMap__[ID] = ObjectToString(pointer)

        for ID in unpack:
            # ID is the SObject entry
            TYPE = unpack[ID]['type']

            if TYPE == 'Geoq':
                obj = GEOQ()
            elif TYPE == 'Hdsgen':
                obj = HDSGEN()
            elif TYPE == 'Powcal':
                obj = POWCAL()
            elif TYPE == 'Eqdsk':
                obj = EQDSK()
            elif TYPE == 'Magtfm':
                obj = MAGTFM()
            else:
                continue
            # Check association in a case object

            for smiterCase, entries in cases:
                if ID in entries:
                    if TYPE == 'Geoq':
                        geoqType = unpack[ID]['data']['geoqType']
                        if geoqType == 'target':
                            smiterCase.setTarget(obj._this())
                            smiterCase.setTargetEntry(ID)
                        elif geoqType == 'shadow':
                            smiterCase.setShadow(obj._this())
                            smiterCase.setShadowEntry(ID)
                        elif geoqType == 'wall':
                            smiterCase.setWall(obj._this())
                            smiterCase.setWallEntry(ID)
                        else:
                            continue
                    elif TYPE == 'Hdsgen':
                        smiterCase.setHDSGEN(obj._this())
                        smiterCase.setHdsgenEntry(ID)
                    elif TYPE == 'Powcal':
                        smiterCase.setPOWCAL(obj._this())
                        smiterCase.setPowcalEntry(ID)
                    elif TYPE == 'Eqdsk':
                        smiterCase.setEQDSK(obj._this())
                        smiterCase.setEqdskEntry(ID)
                    elif TYPE == 'Magtfm':
                        smiterCase.setMAGTFM(obj._this())
                        smiterCase.setMAGTFMEntry(ID)
                    else:
                        continue
            # Set data and map SObject to CORBA instance of object.
            obj.setName(unpack[ID]['name'])
            if TYPE in 'GeoqHdsgenPowcalMagtfm':
                # Its important to first set the GEOQ type, so that the
                # proper DEFAULT VARIABLES are loaded, otherwise no variable
                # can be set.
                if TYPE == 'Geoq':
                    obj.setType(unpack[ID]['data']['geoqType'])
                    obj.setMeshEntry(unpack[ID]['data']['meshEntry'])
                if TYPE == 'Magtfm':
                    obj.setMAG(unpack[ID]['data']['mag.txt'])

                for key in unpack[ID]['data']:
                    if unpack[ID]['data'] == '':
                        obj.clearVariable(key)
                    else:
                        obj.setVariable(key, unpack[ID]['data'][key])
            else:
                # Use lazy set instead of reading the EQDSK data.
                # obj.read(unpack[ID]['data'])
                obj.setEqdskContent(unpack[ID]['data'])
                # EQDSK.
                # obj.read(unpack[ID]['data'])
            pointer = obj._this()
            __entryToIORMap__[ID] = ObjectToString(pointer)
        logger.debug('SMITER.Load Finished')
        return True

    def Close(self, theComponent):
        logger.debug('SMITER.Close: Called')

    def LocalPersistentIDToIOR(self, theSObject, PersistentID, isMultiFile,
                               isASCII):

        if PersistentID in __entryToIORMap__:
            return __entryToIORMap__[PersistentID]

    def DumpPython(self, isPublished, isMultiFile):
        """Dump module data to the Python script.
        """

        abuffer = []
        names = []
        study = getStudy()
        father = study.FindComponent(moduleName())
        if father:
            iter = study.NewChildIterator(father)
            while iter.More():
                name = iter.Value().GetName()
                if name:
                    names.append(name)
                iter.Next()

        if names:
            abuffer += ["from salome import lcc"]
            abuffer += ["import SMITER_ORB"]
            abuffer += [""]
            abuffer += ["pyhello = lcc.FindOrLoadComponent( "
                        "'FactoryServerPy', '%s' )" % moduleName()]
            abuffer += [""]
            abuffer += ["pyhello.createObject( theStudy, '%s' )" % name
                        for name in names]
            abuffer += [""]

        if isMultiFile:
            abuffer = ["  " + s for s in abuffer]
            abuffer[0:0] = ["def RebuildData( theStudy ):"]
            abuffer += ["  pass"]
        abuffer += ["\0"]
        res = "\n".join(abuffer)
        return (res.encode(), 1)


class Base(object):
    def __init__(self, name:str = ''):
        self.order = []
        self.params = {}
        self.variables = {}
        self.name = name
        self._defaultValueModified = False

    def clearVariable(self, name: str):
        """Because CORBA disallows passing empty string as parameters, this
        is used instead. Strange though, retrieving empty string gives no
        errors.
        """

        for section in self.order:
            if name in self.params[section]:
                if self.params[section] == '':
                    if name in self.variables:
                        self.variables.pop(name)
                        if not len(self.variables):
                            self._defaultValueModified = False
                    return True
                else:
                    self.variables[name] = ''

        return False

    def setVariable(self, name: str, val: str):
        """Sets the new value to variable name. If the value is the same as
        the one in the default options, then the value is not changed or in
        other words no actual change has been added.
        """

        # Strip the value
        val = val.strip()
        if not val:
            return False

        for section in self.order:
            if name in self.params[section]:
                if val == self.params[section][name]:
                    if name in self.variables:
                        # Changing the user modified value back to default
                        # value.
                        self.variables.pop(name)
                        if not len(self.variables):
                            self._defaultValueModified = False
                        return True
                    else:
                        # The same value as the default one
                        return True

                else:
                    self.variables[name] = val
                    self._defaultValueModified = True
                    return True

        return False

    def getVariable(self, name: str):
        """Returns the value of parameter. If the parameter is not in the
        self.variables attribute, then we can try to get the default Value.
        """
        if name not in self.variables:
            for section in self.order:
                if name in self.params[section]:
                    return self.params[section][name]
            return ''

        return self.variables[name]

    def getName(self):
        """Returns the name.

        Returns:
            name (str): String which has whites-paces replaced with `_`.
        """
        return self.name

    def setName(self, name):
        """Set the name for object. Usually it should be the same as the name
        of the Salome Object that references to the object.
        """
        self.name = name

    def returnListOfChangedVariables(self):
        """Returns a list of parameters that have non-default values.
        """
        return [key for key in self.variables.keys()]

    def createCTL(self):
        """Creates a string in the form of CTL.

        Returns:
            ctl (str): Parameters stored in the form of FORTRAN namelist.
        """
        ctl = ''
        for section in self.order:
            ctl += '&%s\n' % section

            for parameter in self.params[section]:
                if parameter in self.variables:
                    value = self.variables[parameter]

                    if value:
                        if section in 'inputfilesmagfiles':
                            ctl += "  %s = '%s',\n" % (parameter, value)
                        else:
                            ctl += '  %s = %s,\n' % (parameter, value)
            ctl += '/\n'
        return ctl

    def modified(self):
        """Returns true if there are non-default values stored.
        """
        return self._defaultValueModified


[docs]class POWCAL(SMITER_ORB__POA.POWCAL, Base): """Powcal interface implementation """ def __init__(self, name=''): Base.__init__(self, name) self.order = smardda_parameters.POWCAL_order self.params = smardda_parameters.POWCAL_params
[docs]class HDSGEN(SMITER_ORB__POA.HDSGEN, Base): """HDSGEN interface implementation """ def __init__(self, name=''): Base.__init__(self, name) self.order = smardda_parameters.HDSGEN_order self.params = smardda_parameters.HDSGEN_params
class MAGTFM(SMITER_ORB__POA.MAGTFM, Base): def __init__(self, name=''): Base.__init__(self, name) self.order = smardda_parameters.MAGTFM_order self.params = smardda_parameters.MAGTFM_params self.mag = '' def setMAG(self, mag): """Set the vacuum magnetic field data. """ self.mag = mag def getMAG(self): """Get the vacuum magnetic field data. """ return self.mag
[docs]class GEOQ(SMITER_ORB__POA.GEOQ, Base): """GEOQ interface implementation. Attributes: name (string): Name for the geoq instance type (string): Type of geoq instance. Either target or shadow. variables (dict): A dictionary for non default values for parameters _defaultValueModified (bool): Is true if nondefault values are present meshVTK (string): VTK format of mesh. It is generated on instance """ def __init__(self, name='', type=''): Base.__init__(self, name) self.order = smardda_parameters.GEOQ_order self.params = smardda_parameters.GEOQ_params self.type = type self.meshEntry = '' if type: self.setType(type)
[docs] def setMeshEntry(self, entry): """If used with SALOME study, then the entry is the SObject from Object browser that represents the mesh we wish to use for GEOQ. """ self.meshEntry = entry
[docs] def getMeshEntry(self): """Returns the entry of the Salome object that references to the GEOQ object. """ return self.meshEntry
[docs] def setType(self, type): """Refers to the type to be used when running SMARDA case. Target means that the mesh will be later used for power calculation. Shadow means that it will be used by hdsgen and powcal. """ self.type = type
[docs] def getType(self): """Returns the type of GEOQ. """ return self.type
[docs]class CASE(SMITER_ORB__POA.CASE): """Case object for running SMARDDA. It contains references to all input objects for easier use. Accordingly there are set and get functions for each object holding the input data. The function ``setInputParameters`` sets the parameters of the control CTL files, which requires the input or output data from other input objects. Attributes: wall (:py:mod:`GEOQ`): GEOQ object that contains parameters for wall wallEntry (str): Entry for wall GEOQ Salome Object. If no Salome Object is present then an empty string is placed in. target (:py:mod:`GEOQ`): GEOQ object that contains parameters for target targetEntry (str): Entry for target GEOQ Salome Object. If no Salome object is present then an empty string is placed in. shadow (:py:mod:`GEOQ`): GEOQ object that contains parameters for shadow shadowEntry (str):Entry for shadow GEOQ Salome Object. If no Salome Object is present then an empty string is placed in. hdsgen (:py:mod:`HDSGEN`): HDSGEN object, HDS means Hierarchical Data Structure hdsgenEntry (str): Entry for target HDSGEN Salome Object. If no Salome object is present then an empty string is placed in. powcal (:py:mod:`POWCAL`): POWCAL object powcalEntry (str): Entry for target POWCAL Salome Object. If no Salome object is present then an empty string is placed in. eqdsk (:py:mod:`EQDSK`): EQDSK object, that contains information about equilibrium. eqdskEntry (str): Entry for target EQDSK Salome Object. If no Salome object is present then an empty string is placed in. """ def __init__(self, caseName=''): # self.geoqTarget = geoq.GEOQ() # Empty class # self.geoqShadow = geoq.GEOQ() # self.hdsgen = hdsgen.HDSGEN() # self.powcal = powcal.POWCAL() # self.eqdsk = eqdsk.EQDSK() self.caseName = caseName self.wall = None self.wallEntry = '' self.target = None self.targetEntry = '' self.shadow = None self.shadowEntry = '' self.hdsgen = None self.hdsgenEntry = '' self.powcal = None self.powcalEntry = '' self.magtfm = None self.magtfmEntry = '' self.eqdsk = None self.eqdskEntry = ''
[docs] def setName(self, caseName): """Set a name for the case. It should be the same as the name displayed to the **Salome Object** in **Object browser**. Arguments: caseName (str): Name for the case. """ self.caseName = caseName
[docs] def getName(self): """Returns the name. Returns: name (str): String which has whites-paces replaced with `_`. """ return self.caseName.replace(' ', '_')
[docs] def setTarget(self, target): """Sets the target :py:mod:`GEOQ` object. Arguments: target (:py:mod:`GEOQ`): GEOQ object for target """ self.target = target
def setTargetEntry(self, entry): self.targetEntry = entry
[docs] def getTarget(self): """Returns the :py:mod:`GEOQ` object of the target. Returns: target (:py:mod:`GEOQ`): GEOQ object for the target """ return self.target
[docs] def getTargetEntry(self): """Returns the *Salome Object* entry referencing to the target "py:mod:`GEOQ` object. Returns: targetEntry (str): Entry for the *Salome Object*. """ return self.targetEntry
[docs] def setWall(self, wall): """Sets the wall :py:mod:`GEOQ` object. Arguments: wall (:py:mod:`GEOQ`): GEOQ object for wall """ self.wall = wall
def setWallEntry(self, entry): self.wallEntry = entry
[docs] def getWall(self): """Returns the :py:mod:`GEOQ` object of the wall. Returns: wall (:py:mod:`GEOQ`): GEOQ object for the wall. """ return self.wall
[docs] def getWallEntry(self): """Returns the *Salome Object* entry referencing to the wall "py:mod:`GEOQ` object. Returns: wallEntry (str): Entry for the *Salome Object*. """ return self.wallEntry
[docs] def setShadow(self, shadow, entry=''): """Sets the shadow :py:mod:`GEOQ` object. Arguments: shadow (:py:mod:`GEOQ`): GEOQ object for shadow. """ self.shadow = shadow
def setShadowEntry(self, entry): self.shadowEntry = entry
[docs] def getShadow(self): """Returns the :py:mod:`GEOQ` object of the shadow. Returns: shadow (:py:mod:`GEOQ`): GEOQ object for the shadow. """ return self.shadow
[docs] def getShadowEntry(self): """Returns the *Salome Object* entry referencing to the shadow "py:mod:`GEOQ` object. Returns: shadowEntry (str): Entry for the *Salome Object*. """ return self.shadowEntry
[docs] def setHDSGEN(self, hdsgen, entry=''): """Sets the hdsgen :py:mod:`HDSGEN` object. Arguments: hdsgen (:py:mod:`HDSGEN`): HDSGEN object. """ self.hdsgen = hdsgen
def setHdsgenEntry(self, entry): self.hdsgenEntry = entry
[docs] def getHDSGEN(self): """Returns the :py:mod:`HDSGEN` object of the hdsgen. Returns: hdsgen (:py:mod:`HDSGEN`): HDSGEN object for the hdsgen. """ return self.hdsgen
[docs] def getHDSGENEntry(self): """Returns the *Salome Object* entry referencing to the hdsgen "py:mod:`HDSGEN` object. Returns: hdsgenEntry (str): Entry for the *Salome Object*. """ return self.hdsgenEntry
[docs] def setPOWCAL(self, powcal, entry=''): """Sets the powcal :py:mod:`POWCAL` object. Arguments: powcal (:py:mod:`POWCAL`): POWCAL object. """ self.powcal = powcal
def setPowcalEntry(self, entry): self.powcalEntry = entry
[docs] def getPOWCAL(self): """Returns the :py:mod:`POWCAL` object of the powcal. Returns: powcal (:py:mod:`POWCAL`): POWCAL object for the powcal. """ return self.powcal
[docs] def getPOWCALEntry(self): """Returns the *Salome Object* entry referencing to the powcal "py:mod:`POWCAL` object. Returns: powcalEntry (str): Entry for the *Salome Object*. """ return self.powcalEntry
[docs] def setMAGTFM(self, magtfm): """Sets the magtfm :py:mod:`MAGTFM` object. Arguments: magtfm (:py:mod:`MAGTFM`): MAGTFM object. """ self.magtfm = magtfm
def setMAGTFMEntry(self, entry): self.magtfmEntry = entry def getMAGTFM(self): return self.magtfm def getMAGTFMEntry(self): return self.magtfmEntry
[docs] def setEQDSK(self, equilibrium, entry=''): """Sets the eqdsk :py:mod:`EQDSK` object. Arguments: eqdsk (:py:mod:`EQDSK`): EQDSK object for shadow. """ self.eqdsk = equilibrium
def setEqdskEntry(self, entry): self.eqdskEntry = entry
[docs] def getEQDSK(self): """Returns the :py:mod:`EQDSK` object of the eqdsk. Returns: eqdsk (:py:mod:`EQDSK`): EQDSK object for the eqdsk. """ return self.eqdsk
[docs] def getEQDSKEntry(self): """Returns the *Salome Object* entry referencing to the eqdsk "py:mod:`EQDSK` object. Returns: eqdskEntry (str): Entry for the *Salome Object*. """ return self.eqdskEntry
[docs] def setInputParameters(self): """After you set the GEOQ target, GEOQ shadow, HDSGEN hdsgen and POWCAL powcal, this function properly sets the input parameters. If the input parameters are changed after the case is created, then this can be called again manually, or just run the ComputeCase, which always call this before computing. """ wall = self.getWall() # There can also be no wall. target = self.getTarget() if target is None: return False shadow = self.getShadow() if shadow is None: return False hdsgen = self.getHDSGEN() if hdsgen is None: return False powcal = self.getPOWCAL() if powcal is None: return False eqdsk = self.getEQDSK() if eqdsk is None: return False magtfm = self.getMAGTFM() skipMagtfm = False plot_bcartx = False if magtfm is None: skipMagtfm = True targetName = target.getName() shadowName = shadow.getName() hdsgenName = hdsgen.getName() eqdskName = eqdsk.getName().rsplit('/', 1)[-1] # Get the basename if not skipMagtfm: magtfmName = magtfm.getName() # Get the name # Check if plot_bcartx is set to true _flag = magtfm.getVariable('plot_bcartx') plot_bcartx = True if _flag == '.true.' else False # Setting magtfm input parameters if not skipMagtfm: magtfm.setVariable('mag_input_file', f'mag/{magtfmName}.txt') if plot_bcartx: magtfm.setVariable('mag_output_file', 'null') pass else: magtfm.setVariable('mag_output_file', f'mag/{magtfmName}.sp3') # Setting wall input parameters if wall: wallName = wall.getName() wall.setVariable('vtk_input_file', f'wall/{wallName}.vtk') wall.setVariable('eqdsk_input_file', f'eqdsk/{eqdskName}') # Setting target input parameters target.setVariable('vtk_input_file', f'target/{targetName}.vtk') target.setVariable('eqdsk_input_file', f'eqdsk/{eqdskName}') if not skipMagtfm: if plot_bcartx: target.clearVariable('beq_vacuum_field_file') pass else: target.setVariable('beq_vacuum_field_file', f"'mag/{magtfmName}.sp3'") # Setting shadow input parameters shadow.setVariable('vtk_input_file', f'shadow/{shadowName}.vtk') shadow.setVariable('eqdsk_input_file', f'eqdsk/{eqdskName}') if not skipMagtfm: if plot_bcartx: shadow.setVariable('beq_vacuum_field_file', f"'mag/magtfm_bx.txt'") pass else: shadow.setVariable('beq_vacuum_field_file', f"'mag/{magtfmName}.sp3'") # Setting hdsgen input parameters hdsgen.setVariable('vtk_input_file', f'shadow/geoq_geoqm.vtk') # Setting powcal input parameters powcal.setVariable('vtkres_input_file', 'target/geoq_geoqm.vtk') powcal.setVariable('vtk_input_file', 'shadow/geoq_geoqm.vtk') powcal.setVariable('geoq_input_file', 'shadow/geoq_geoq.out') powcal.setVariable('hds_input_file', 'hds/hdsgen_hds.hds') return True
[docs] def generateSaveString(self) -> str: """Create a dictionary containing all information of the case and then output it as a string. """ out = {} if self.wall is not None: # Wall GEOQ parameters out['wall'] = {param: self.wall.getVariable(param) for param in self.wall.returnListOfChangedVariables()} if self.target is not None: # Target GEOQ parameters out['target'] = {param: self.target.getVariable(param) for param in self.target.returnListOfChangedVariables()} if self.shadow is not None: out['shadow'] = {param: self.shadow.getVariable(param) for param in self.shadow.returnListOfChangedVariables()} if self.hdsgen is not None: out['hds'] = {param: self.hdsgen.getVariable(param) for param in self.hdsgen.returnListOfChangedVariables()} if self.powcal is not None: out['powcal'] = {param: self.hdsgen.getVariable(param) for param in self.hdsgen.returnListOfChangedVariables()} # Eqdsk data if self.eqdsk is not None: out['eqdsk'] = self.eqdsk.generateText() # MAG data if self.mag is not None: out['mag'] = self.mag.getMAG() try: jsonOut = json.dumps(out) except Exception as e: print(e) print('Failed to create JSON string from CASE data dictionary.') return '' return jsonOut
[docs] def loadString(self, jsonString: str) -> bool: """Take a json string containing dictionary with all paremeters for the case. """ try: inp = json.loads(jsonString) except Exception as e: print(e) print('Failed to load the string with JSON.') return False if 'wall' in inp: self.wall = GEOQ() return True
""" IMPORTANT! Since the EQDSK G format is no longer a consistent standard (deviation, comments, non-fixed format), this utility is only used for saving the contents of the loaded EQDSK file and saving to a file whenever a SMITER case is run. The parsing is still done for plotting purposes or for just debugging of the format, i.e., to see if the EQDSK file conforms to the EQDSK G format. """ # Wrapper function that checks if an EQDSK instance needs to parse the data, # when a request for a quantity happens. def readEqdskContentsIfRequired(f): def wrapper(*args): instance = args[0] if not instance.hasItBeenRead: instance.read(instance.eqdskString) return f(*args) return wrapper
[docs]class EQDSK(SMITER_ORB__POA.EQDSK): """EQDSK interface implementation The following quantities are read from a ``EQDSK G``. Attributes: PSIRZ (arr): Array of plasma psi values in units WEBER. CURRENT (float): Plasma current in Ampere RDIM (float): Horizontal dimension in meter of computational box ZDIM (float): Vertical dimension in meter of computational box NW (int): Number of horizontal R grid points NH (int): Number of vertical Z grid points LIMITR (int): Number of limiter points RLIM (arr): R of surrounding limiter contour in meter ZLIM (arr): Z of surrounding limiter contour in meter NBBBS (int): Number of boundary points RBBBS (arr): R of boundary points in meter ZBBBS (arr): Z of boundary points in meter RMAXIS (arr): R of magnetic axis in meter ZMAXIS (arr): Z of magnetic axis in meter FPOL (arr): Poloidal current function in m-T, F=RB_T on flux grid PRES (arr): Plasma pressure in nt/m^2 on uniform flux grid FFPRIM (arr): FF'(Psi) in (mT)^2/(Weber /rad) on uniform flux grid PPRIME (arr): P'(Psi) in (nt/m^2) / (Weber /rad) on uniform flux grid QPSI (arr): q values on uniform flux grid from axis to boundary SIMAG (arr): poloidal flux at magnetic axis in Weber /rad SIBRY (arr): poloidal flux at the plasma boundary in Weber /rad RCENTR (float): R in meter of vacuum toroidal magnetic field BCENTR BCENTR (float): Vacuum toroidal magnetic field in Tesla at RCENTR RLEFT (float): Minimum R in meter of rectangular computational box ZMID (float): Z of center of computational box in meter .. note:: The quantity FPOL is poloidal current function in m-T (r Bt). It's domain is the flux region from SIMAG to SIBRY. They are all accessible with the following example .. code-block:: python import eqdsk x = eqdsk.EQDSK() eqdskFile = '/path/to/eqdsk/format/file' # If SMITER is located on your machine x.openAndRead(eqdskFile) # If SMITER is used remotely you have to manually pass the contents # of the EQDSK file. In this case: with open(eqdskFile) as f: data = f.read() ok = x.read(data) # Then set the name of the eqdsk file if ok: # Reading Successful eqdskName = eqdskFile.rpslit('/', 1)[-1] # Get the name of the file x.setName(eqdskFile) else: print('Trouble reading EQDSK file %s' % eqdskFile) # If the file was read without a problem, you can access, i.e, the # geometry of the limiter x.RLIM x.ZLIM """ # Pattern for reading integer values separated with spaces integerPattern = u'(?:^|(?<=\s))-?[0-9]+(?:$|(?=\s))' intPatt = re.compile('.*' + integerPattern + '.*') # Pattern for reading scientific numbers scientificPattern = u'[+-]?[0-9]\.[0-9]{6,9}[eE][+-][0-9]{2}' # Pattern for reading both integers and scientific numbers. genPatt = re.compile(u'(?:%s|%s)' % (integerPattern, scientificPattern)) def __init__(self, name=''): self.name = name self.logMessage = '' self.resetValues() def resetValues(self): self.INPUT_EQDSK = '' self.HEADER = '' self.PSIRZ = [] # Plasma psi WEBER self.CURRENT = None self.RDIM = None # Size dimension for R and Z self.ZDIM = None self.NW = None # Dimension of grid self.NH = None self.LIMITR = None # Number of limiter points self.RLIM = [] # rlim self.ZLIM = [] # zlim self.NBBBS = None # Number of Boundary points self.RBBBS = [] # rbbbs self.ZBBBS = [] # zbbs self.RMAXIS = [] # Position of toroidal magnetic self.ZMAXIS = [] # field self.FPOL = [] self.PRES = [] self.FFPRIM = [] self.PPRIME = [] self.QPSI = [] self.SIMAG = None # Flux value at magnetix axis self.SIBRY = None # Flux value on boundary self.RCENTR = None # Reference value: position R, mag field B self.BCENTR = None # B value in RCENTR self.RLEFT = None # Minimum of rectangular computational box. self.ZMID = None # Vertical dimension of computational box """ Or in another case: The grid goes from : RLEFT to RLEFT + RDIM -ZDIM / 2 to ZDIM / 2 """ self.eqdskString = '' self.hasItBeenRead = False self.successfullRead = False def getName(self): """Returns the name. Returns: name (str): String which has whites-paces replaced with `_`. """ return self.name.replace(' ', '_') def setName(self, name): """Set a name for eqdsk.. It should be the same as the name displayed to the **Salome Object** in **Object browser**. Arguments: eqdskName (str): Name for the case. """ self.name = name def openAndRead(self, filePath): if not os.access(filePath, os.F_OK | os.R_OK): print('File does not exist or lacking permission to read it!') return False data = '' with open(filePath, 'r') as f: data = f.read() # Setting name self.setName(filePath.split('/')[-1]) return self.read(data) def logValue(self, msg, value, t='f'): if t == 'd': formatedMsg ='{0:>30} %d'.format(msg + ':') % value else: formatedMsg ='{0:>30} %f'.format(msg + ':') % value self.logMessage += formatedMsg + '\n' def log(self, msg): """Function used to save the parsing information of the EQDSK file. """ self.logMessage += msg + '\n' @readEqdskContentsIfRequired def getLog(self): return self.logMessage def setEqdskContent(self, eqdskString): """Set the full contents of an EQDSK file, but do not read it! """ self.eqdskString = eqdskString def getEqdskContent(self): """Return the full contents of an EQDSK file. """ return self.eqdskString def read(self, eqdskString): """Passing string instead of a filename, to avoid any troubles if this is used on a server. """ self.resetValues() self.eqdskString = eqdskString # Reset the log self.logMessage = 'Reading eqdsk file %s\n' % self.name LINES = eqdskString.splitlines() HEADER = LINES[0] self.HEADER = HEADER if len(self.HEADER) > 79: self.logMessage += 'First line is longer than 79 characters!\n' # regexHeader = '^.+[0-9] +?([0-9]+) +?([0-9]+)' regexHeader = '\s[0-9]\s+([0-9]+)\s+([0-9]+)' result = re.findall(regexHeader, HEADER) # self.log("Reading 1st line of EQDSK.") if result: nw, nh = result[0] self.NW = int(nw) self.NH = int(nh) self.logValue('Width', self.NW, t='d') self.logValue('Height', self.NH, t='d') else: self.log("Couldn't read the dimensions from the 1st line.") self.log("LINE:\n%s" % HEADER) return False # self.log('Done reading 1st line.') # self.log("Reading 2nd line of EQDSK") i = 1 result = self.readOneLine(LINES[i]) if result == -1: # self.log("[INFO] 2nd line must be a comment") while 1: # self.log("[INFO] Filtering the comments.") i += 1 result = self.readOneLine(LINES[i]) if result != -1: break # Precaution, if there are 100 wrong lines, just end the # reading if i > 100: return False try: self.RDIM = result[0] self.ZDIM = result[1] self.RCENTR = result[2] self.RLEFT = result[3] self.ZMID = result[4] self.logValue('Dimension R', self.RDIM) self.logValue('Dimension Z', self.ZDIM) self.logValue('Center R', self.RCENTR) self.logValue('Left R', self.RLEFT) self.logValue('Middle Z', self.ZMID) # self.log('Done reading 2nd line.') # self.log("Reading 3rd line of EQDSK") i += 1 result = self.readOneLine(LINES[i]) self.RMAXIS = result[0] self.ZMAXIS = result[1] self.SIMAG = result[2] self.SIBRY = result[3] self.BCENTR = result[4] self.logValue('Magnetic axis position R', self.RMAXIS) self.logValue('Magnetic axis position Z', self.ZMAXIS) self.logValue('Flux at magnetic axis', self.SIMAG) self.logValue('Flux at boundary', self.SIBRY) self.logValue('Center B', self.BCENTR) # self.log("Done reading 3rd line.") i += 1 # self.log('Reading 4th line of EQDSK') result = self.readOneLine(LINES[i]) self.CURRENT = result[0] self.logValue('Current', self.CURRENT) msg = "The 2nd value on line 4 of the EQDSK is not equal to " \ "the flux at magnetic axis read from the 3rd line" assert self.SIMAG == result[1], msg DUMMY = result[2] msg = "The 4th value on line 4 of the EQDSK is not equal to the"\ " magnetic axis R position, read from the 3rd line!" assert self.RMAXIS == result[3], msg DUMMY = result[4] # self.log("Done reading 4th line.") i += 1 # self.log("Reading 5th line of EQDSK") result = self.readOneLine(LINES[i]) msg = "The 1st value on line 5 is not equal to the magnetic " \ " axis Z position, read from the 3rd line!" assert self.ZMAXIS == result[0], msg DUMMY = result[1] msg = " The 3rd value on line 5 is not equal to the flux at " \ " boundary, read from the 3rd line" assert self.SIBRY == result[2], msg DUMMY = result[3] DUMMY = result[4] # self.log("Done reading 5th line.") line_index = i + 1 self.log("Reading values from 5th line on...") P1, P2 = self.readValues(LINES[line_index:]) N_VALUES = len(P1) + len(P2) TOTAL_VALUES = 5 * self.NW + self.NH * self.NW + 2 _I = 0 self.FPOL = P1[_I: _I + self.NW] _I += self.NW self.PRES = P1[_I: _I + self.NW] _I += self.NW self.FFPRIM = P1[_I: _I + self.NW] _I += self.NW self.PPRIME = P1[_I: _I + self.NW] _I += self.NW self.PSIRZ = [] for i in range(self.NH): self.PSIRZ.append(P1[_I: _I + self.NW]) _I += self.NW self.QPSI = P1[_I: _I + self.NW] _I += self.NW _I = 0 self.NBBBS = int(P2[_I]) _I += 1 self.LIMITR = int(P2[_I]) _I += 1 self.logValue('N boundary', self.NBBBS, t='d') self.logValue('N limiter', self.LIMITR, t='d') self.log("Read %d values." % N_VALUES) TOTAL_VALUES += (self.NBBBS + self.LIMITR) * 2 self.log('%d actual values required according to EQDSK G format.' % TOTAL_VALUES) self.RBBBS = [] self.ZBBBS = [] for i in range(self.NBBBS): self.RBBBS.append(P2[_I]) self.ZBBBS.append(P2[_I + 1]) _I += 2 self.RLIM = [] self.ZLIM = [] for i in range(self.LIMITR): self.RLIM.append(P2[_I]) self.ZLIM.append(P2[_I + 1]) _I += 2 except Exception as e: print(e) self.successfullRead = False return False self.successfullRead = True return True def readOneLine(self, line): """Reads the float values from one line. """ pattern = '-?[0-9]{1}\.[0-9]{9}[eE][-+]?[0-9]{2}' result = re.findall(pattern, line) if line.lstrip()[0].isalpha(): return -1 if len(result) == 5: # 5 float values on line. out = [float(el) for el in result] return out else: # Manually check the values out = [] for el in line.split(): try: out.append(float(el)) except Exception as e: pass if len(out) == 5: # 5 values. All ok return out else: # Must be a comment line return -1 def readValues(self, LINES): """Reads values from EQDSK lines. """ # Part one consists of values before the section of limiter and # boundary geometry. Basically meaning reading values until finding # two integer values. part_1 = [] # Part two are all values regarding plasma and boundary geometry. # HOPEFULLY, the lines start with two INTEGERS and not somewhere in # between. part_2 = [] # # Pattern for reading integer values separated with spaces # integerPattern = u'(?:^|(?<=\s))-?[0-9]+(?:$|(?=\s))' # intPatt = re.compile('.*' + integerPattern + '.*') # # Pattern for reading scientific numbers # scientificPattern = u'[+-]?[0-9]\.[0-9]{6,9}[eE][+-][0-9]{2}' # pattern = u'(?:%s|%s)' % (integerPattern, scientificPattern) # genPatt = re.compile(pattern) SWITCH = 1 for line in LINES: if SWITCH and self.intPatt.match(line): SWITCH = 0 result = self.genPatt.findall(line) if SWITCH: part_1 += [float(el) for el in result] else: part_2 += [float(el) for el in result] return part_1, part_2 def readOk(self): return self.successfullRead def generateText(self): """Generates an EQDSK format text. """ # return self.eqdskString # The problem with EQDSK G format is that is no longer a consistent # standard. Therefore the utility eqdsk.py is mainly used for reading # the value, maybe validating the format and using the read value # to plot data. Therefore when we want to save the contents of the # eqdskString, the eqdsk is no longer generated by the EQDSK G format # but the contents of the loaded eqdsk file is just printed. #--------------------------------------------------------------------- if not self.hasItBeenRead: self.read(self.eqdskString) if not self.successfullRead: return self.eqdskString self.generatedText = '' self.generatedText += self.HEADER + '\n' # First line DUMMY = 0.0 self._PerLineCount = 1 self.createRealLine(self.RDIM, self.ZDIM, self.RCENTR, self.RLEFT, self.ZMID) self.createRealLine(self.RMAXIS, self.ZMAXIS, self.SIMAG, self.SIBRY, self.BCENTR) self.createRealLine(self.CURRENT, self.SIMAG, DUMMY, self.RMAXIS, DUMMY) self.createRealLine(self.ZMAXIS, DUMMY, self.SIBRY, DUMMY, DUMMY) self.createRealLine(*self.FPOL) if not self.generatedText.endswith('\n'): self.generatedText += '\n' self._PerLineCount = 1 self.createRealLine(*self.PRES) if not self.generatedText.endswith('\n'): self.generatedText += '\n' self._PerLineCount = 1 self.createRealLine(*self.FFPRIM) if not self.generatedText.endswith('\n'): self.generatedText += '\n' self._PerLineCount = 1 self.createRealLine(*self.PPRIME) if not self.generatedText.endswith('\n'): self.generatedText += '\n' self._PerLineCount = 1 for psi_width in self.PSIRZ: self.createRealLine(*psi_width) if not self.generatedText.endswith('\n'): self.generatedText += '\n' self._PerLineCount = 1 self.createRealLine(*self.QPSI) if not self.generatedText.endswith('\n'): self.generatedText += '\n' self.createIntLine(self.NBBBS, self.LIMITR) self.generatedText += '\n' self._PerLineCount = 1 alternating_array = [None] * 2 * self.NBBBS alternating_array[::2] = self.RBBBS alternating_array[1::2] = self.ZBBBS self.createRealLine(*alternating_array) self.generatedText += '\n' self._PerLineCount = 1 if self.LIMITR: alternating_array = [None] * 2 * self.LIMITR alternating_array[::2] = self.RLIM alternating_array[1::2] = self.ZLIM self.createRealLine(*alternating_array) if not self.generatedText.endswith('\n'): self.generatedText += '\n' return self.generatedText def createRealLine(self, *args): Format = '%.9E' for el in args: stringReal = Format % el self.generatedText += ' ' * (16 - len(stringReal)) + stringReal if self._PerLineCount % 5 == 0: self.generatedText += '\n' self._PerLineCount += 1 def createIntLine(self, *args): for el in args: self.generatedText += ' ' + str(el) def mmMoveInR(self, mm): """ Radially moves the values of eqdsk for *mm* millimeters. All R relevant quantities: RLIM RBBBS RLEFT RMAXIS Note that these quantities are not necessarily in millimeters! """ if not self.successfullRead: return orderLim = '%.1E' % max(self.RLIM) orderLim = int(orderLim[-2:]) scale = 1.0 if orderLim == 0: # Meters scale = 1e-3 elif orderLim == 2: # Centimeters scale = 1e-2 elif orderLim == 3: # Millimeters scale = 1 else: # print('Strange units as RLIM!') return for i in range(len(self.RLIM)): self.RLIM[i] += scale * mm orderBoundary = '%.1E' % max(self.RBBBS) orderBoundary = int(orderBoundary[-2:]) scale = 1.0 if orderBoundary == 0: scale = 1e-3 elif orderBoundary == 2: scale = 1e-2 elif orderBoundary == 3: scale = 1 else: # print('Strange Units at RBBBS!') return for i in range(len(self.RBBBS)): self.RBBBS[i] += scale * mm orderLeftR = '%.1E' % self.RLEFT orderLeftR = int(orderLeftR[-2:]) scale = 1.0 if orderLeftR == 0: scale = 1e-3 elif orderLeftR == 2: scale = 1e-2 elif orderLeftR == 3: scale = 1 else: # print('Strange Units at RLEFT!') return self.RLEFT += scale * mm orderMagAxisR = '%.1E' % self.RLEFT orderMagAxisR = int(orderMagAxisR[-2:]) scale = 1.0 if orderMagAxisR == 0: scale = 1e-3 elif orderMagAxisR == 2: scale = 1e-2 elif orderMagAxisR == 3: scale = 1 else: # print('Strange Units at RMAXIS!') return self.RMAXIS += scale * mm def fluxOffset(self, offset): """Offsetting the flux quantities. SIMAG += offset SIBRY = offset PSIRZ += offset """ if not self.successfullRead: return self.SIMAG += offset self.SIBRY = offset X = len(self.PSIRZ) Y = len(self.PSIRZ[0]) for i in range(X): for j in range(Y): self.PSIRZ[i][j] += offset def convertTo_m(self, a): """Determine the size unit. If the units are in mm, divide the values by 1000. Arguments: a (array): Array of floats holding the R or Z values. """ order = int(math.log10(max([abs(el) for el in a]))) if order == 2: # Hopefully centimeters return [el / 100.0 for el in a] elif order == 3: # Hopefully mm return [el / 1000.0 for el in a] else: return a @readEqdskContentsIfRequired def getRBBBS(self): """Gets the R points for LCFS. """ return self.convertTo_m(self.RBBBS) @readEqdskContentsIfRequired def getZBBBS(self): """Gets the Z points for LCFS. Determine the size unit. If the units are in meters, multiply the values by 1000. """ return self.convertTo_m(self.ZBBBS) @readEqdskContentsIfRequired def getRLIM(self): """Gets the R points for wall silhouette. """ return self.convertTo_m(self.RLIM) @readEqdskContentsIfRequired def getZLIM(self): """Gets the Z points for wall silhouette. """ return self.convertTo_m(self.ZLIM) @readEqdskContentsIfRequired def getCURRENT(self): """Returns plasma current in Ampere """ return self.CURRENT @readEqdskContentsIfRequired def getHEADER(self): return self.HEADER @readEqdskContentsIfRequired def getPSIRZ(self): return self.PSIRZ @readEqdskContentsIfRequired def getRDIM(self): return self.RDIM @readEqdskContentsIfRequired def getZDIM(self): return self.ZDIM @readEqdskContentsIfRequired def getNW(self): return self.NW @readEqdskContentsIfRequired def getNH(self): return self.NH @readEqdskContentsIfRequired def getLIMITR(self): return self.LIMITR @readEqdskContentsIfRequired def getNBBBS(self): return self.NBBBS @readEqdskContentsIfRequired def getRMAXIS(self): return self.RMAXIS @readEqdskContentsIfRequired def getZMAXIS(self): return self.ZMAXIS @readEqdskContentsIfRequired def getFPOL(self): return self.FPOL @readEqdskContentsIfRequired def getPRES(self): return self.PRES @readEqdskContentsIfRequired def getFFPRIM(self): return self.FFPRIM @readEqdskContentsIfRequired def getPPRIME(self): return self.PPRIME @readEqdskContentsIfRequired def getQPSI(self): return self.QPSI @readEqdskContentsIfRequired def getSIMAG(self): return self.SIMAG @readEqdskContentsIfRequired def getSIBRY(self): return self.SIBRY @readEqdskContentsIfRequired def getRCENTR(self): return self.RCENTR @readEqdskContentsIfRequired def getBCENTR(self): return self.BCENTR @readEqdskContentsIfRequired def getRLEFT(self): return self.RLEFT @readEqdskContentsIfRequired def getZMID(self): return self.ZMID def setHEADER(self, new): self.HEADER = new def setPSIRZ(self, new): self.PSIRZ = new def setCURRENT(self, new): self.CURRENT = new def setRDIM(self, new): self.RDIM = new def setZDIM(self, new): self.ZDIM = new def setNW(self, new): self.NW = new def setNH(self, new): self.NH = new def setLIMITR(self, new): self.LIMITR = new def setRLIM(self, new): self.RLIM = new def setZLIM(self, new): self.ZLIM = new def setNBBBS(self, new): self.NBBBS = new def setRBBBS(self, new): self.RBBBS = new def setZBBBS(self, new): self.ZBBBS = new def setRMAXIS(self, new): self.RMAXIS = new def setZMAXIS(self, new): self.ZMAXIS = new def setFPOL(self, new): self.FPOL = new def setPRES(self, new): self.PRES = new def setFFPRIM(self, new): self.FFPRIM = new def setPPRIME(self, new): self.PPRIME = new def setQPSI(self, new): self.QPSI = new def setSIMAG(self, new): self.SIMAG = new def setSIBRY(self, new): self.SIBRY = new def setRCENTR(self, new): self.RCENTR = new def setBCENTR(self, new): self.BCENTR = new def setRLEFT(self, new): self.RLEFT = new def setZMID(self, new): self.ZMID = new
class EQDSK_RBSpline(object): """Class for interpolating EQDSK-G data for finding the LCFS. """ def __init__(self): self.EQDSK = None self.INTRP = None self.NW = None self.R1 = None self.R2 = None self.NH = None self.Z1 = None self.Z2 = None def setEqdskObject(self, obj): self.EQDSK = obj def readEqdskFile(self, file): with open(file, 'r') as f: string = f.read() obj = EQDSK() obj.read(string) self.EQDSK = obj def interpolate(self): if not self.EQDSK: return False if not self.EQDSK.readOk(): return False # Width, Height self.NW = self.EQDSK.getNW() self.NH = self.EQDSK.getNH() # R values self.RDIM = self.EQDSK.getRDIM() self.R1 = self.EQDSK.getRLEFT() self.R2 = self.R1 + self.RDIM R = np.linspace(self.R1, self.R2, self.NW) # Z values self.ZMID = self.EQDSK.getZMID() self.ZDIM = self.EQDSK.getZDIM() self.Z1 = self.ZMID - 0.5 * self.ZDIM self.Z2 = self.ZMID + 0.5 * self.ZDIM Z = np.linspace(self.Z1, self.Z2, self.NH) self.PSIRZ = np.asarray(self.EQDSK.getPSIRZ()) from scipy.interpolate import RectBivariateSpline self.INTRP = RectBivariateSpline(Z, R, self.PSIRZ, kx=5, ky=5, s=0.0) return True def findStartBisectR(self, value, R1, R2, Z): """Find the starting point using bisection """ def bisectFunction(R, Z, value): """Traverse on a line where the Z value if constant """ return self.INTRP.ev(Z, R) - value from scipy.optimize import bisect return bisect(bisectFunction, R1, R2, maxiter=10000, args=(Z, value)) def findStartBisectZ(self, value, Z1, Z2, R): """Find the starting point using bisection """ def bisectFunction(Z, R, value): """Traverse on a line where the Z value if constant """ return self.INTRP.ev(Z, R) - value from scipy.optimize import bisect return bisect(bisectFunction, Z1, Z2, maxiter=10000, args=(R, value)) def _findCurve(self, pos, value, step, tolerance): i = 0 R = [pos[1]] Z = [pos[0]] _ok = False while i < 1e5: # Get derivatives der = self.INTRP.ev(pos[0], pos[1], dx=1), \ self.INTRP.ev(pos[0], pos[1], dy=1) rot90 = [der[1], -der[0]] rot90 /= np.linalg.norm(rot90) newPos = [0.0, 0.0] newPos[0] = pos[0] + step * rot90[0] newPos[1] = pos[1] + step * rot90[1] deltaPsi = value - self.INTRP.ev(*newPos) corrRot90 = [rot90[1], -rot90[0]] # Already normed der = self.INTRP.ev(newPos[0], newPos[1], dx=1), \ self.INTRP.ev(newPos[0], newPos[1], dy=1) scale = deltaPsi / np.linalg.norm(der) newPos[0] -= scale * corrRot90[0] newPos[1] -= scale * corrRot90[1] if i > 5000: disR = R[0] - newPos[1] disZ = Z[0] - newPos[0] distance = disR * disR + disZ * disZ # if distance < 1: # print(distance) if distance <= tolerance: _ok = True break if newPos[1] < self.R1 or newPos[1] > self.R2: # Out of bounds break if newPos[0] < self.Z1 or newPos[0] > self.Z2: # Out of bounds break R.append(newPos[1]) Z.append(newPos[0]) pos = newPos i += 1 return _ok, R, Z def findIsoContour(self, value, movementStep=1e-3, tolerance=1e-6): # Scan for intervals where the value is inside of it. _ok = False points = [] for i in range(10): for j in range(10): r1 = self.R1 + i * self.RDIM / 10.0 r2 = r1 + self.RDIM / 10.0 z_h = self.Z1 + (0.5 + j) * self.ZDIM / 10.0 v1 = self.INTRP.ev(z_h, r1) - value v2 = self.INTRP.ev(z_h, r2) - value if v1 * v2 < 0: _t = 'Horizontal' _ok = True # print('Z', z_h, 'R', r1, r2, v1, v2) points.append(['H', z_h, r1, r2]) continue z1 = self.Z1 + j * self.ZDIM / 10.0 z2 = z1 + self.ZDIM / 10.0 r_h = self.R1 + (0.5 + i) * self.RDIM / 10.0 v1 = self.INTRP.ev(z1, r_h) - value v2 = self.INTRP.ev(z2, r_h) - value if v1 * v2 < 0: _t = 'Vertical' _ok = True # print('R', r_h, 'Z', z1, z2, v1, v2) points.append(['V', r_h, z1, z2]) if not _ok: print('Failed to find value in the sweep.') return [], [] for _set in points: _t = _set[0] half, c1, c2 = _set[1:] if _t == 'H': pos = [half, self.findStartBisectR(value, c1, c2, half)] else: pos = [self.findStartBisectZ(value, c1, c2, half), half] _ok, R, Z = self._findCurve(pos, value, movementStep, tolerance) if _ok: return R, Z return [], []