import SMITER_ORB__POA
import SALOME_ComponentPy
import SALOME_Embedded_NamingService_ClientPy
import SALOME_DriverPy
import smardda_parameters
import re
import math
import os
import numpy as np
import SALOMEDS
from SMITER_utils import (findOrCreateComponent, objectID, moduleName, caseID,
geoqID, hdsgenID, powcalID, eqdskID, magtfmID,
unknownID, getStudy, verbose)
import json
from omniORB import CORBA
from salome.kernel.logger import Logger
from salome.kernel import termcolor
logger = Logger('SMITER', color=termcolor.RED_FG)
if verbose():
logger.showDebug()
myORB = CORBA.ORB_init([''], CORBA.ORB_ID)
# Map SObject.
__entryToIORMap__ = {}
def ObjectToString(object):
"""Get IOR for object
"""
return myORB.object_to_string(object)
def StringToObject(string):
"""Get object from IOR string
"""
return myORB.string_to_object(string)
"""Because in python2 json.load and json.loads create unicode objects and not
string object, the following example has been took from
https://stackoverflow.com/questions/956867/how-to-get-string-objects-instead\
-of-unicode-from-json/33571117#33571117, which returns string objects instead
of unicode objects
"""
def json_loads(json_text):
return _byteify(
json.loads(json_text, object_hook=_byteify),
ignore_dicts=True)
def _byteify(data, ignore_dicts = False):
# if this is a unicode string, return its string representation
# The following is not needed in Python >3
# if isinstance(data, unicode):
# return data.encode('utf-8')
# if this is a list of values, return list of byteified values
if isinstance(data, list):
return [ _byteify(item, ignore_dicts=True) for item in data ]
# if this is a dictionary, return dictionary of byteified keys and values
# but only if we haven't already byteified it
if isinstance(data, dict) and not ignore_dicts:
return {
_byteify(key, ignore_dicts=True): _byteify(value, ignore_dicts=True)
for key, value in data.items()
}
# if it's anything else, return it in its original form
return data
class SMITER(SMITER_ORB__POA.SMITER_Gen,
SALOME_ComponentPy.SALOME_ComponentPy_i,
SALOME_DriverPy.SALOME_DriverPy_i):
"""Construct an instance of SMITER module engine.
The class SMITER implements CORBA interface SMITER_Gen (see
SMITER_Gen.idl). It is inherited from the classes SALOME_ComponentPy_i
(implementation of Engines::EngineComponent CORBA interface - SALOME
component) and SALOME_DriverPy_i (implementation of SALOMEDS::Driver CORBA
interface - SALOME module's engine).
"""
def __init__(self, orb, poa, contID, containerName, instanceName,
interfaceName):
SALOME_ComponentPy.SALOME_ComponentPy_i.__init__(self, orb, poa,
contID, containerName, instanceName, interfaceName, False)
SALOME_DriverPy.SALOME_DriverPy_i.__init__(self, interfaceName)
#
emb_ns = self._contId.get_embedded_NS_if_ssl()
import CORBA
if CORBA.is_nil(emb_ns):
self._naming_service = SALOME_ComponentPy.SALOME_NamingServicePy_i( self._orb )
else:
self._naming_service = SALOME_Embedded_NamingService_ClientPy.SALOME_Embedded_NamingService_ClientPy(emb_ns)
pass
logger.debug('SMITER engine instance created')
def getVersion(self):
"""Get version information.
"""
import salome_version
return salome_version.getVersion("SMITER", True)
def makeBanner(self, name):
"""Generate hello banner.
"""
banner = "Hello %s!" % name
return banner
def raiseAnException(self):
"""Intentionnally raises an exception for test purposes.
"""
import SALOME
exData = SALOME.ExceptionStruct(SALOME.BAD_PARAM, "Test exception in"
" raiseAnException()", '', 0)
raise SALOME.SALOME_Exception(exData)
def createObject(self, name):
"""Create object.
"""
study = getStudy()
builder = study.NewBuilder()
father = findOrCreateComponent()
object = builder.NewObject(father)
attr = builder.FindOrCreateAttribute(object, "AttributeName")
attr.SetValue(name)
attr = builder.FindOrCreateAttribute(object, "AttributeLocalID")
attr.SetValue(objectID())
# self.PublishInStudy(study, object, geoq.GEOQ(name)._this(), name)
def CanPublishInStudy(self, IOR):
"""Enable publishing of objects in study.
Arguments:
IOR (string): Check objects type.
"""
return True
def publishCase(self, study, case):
"""Publishes study to the study object browser
"""
# First publish the case object
caseName = case.getName()
caseSObj = self.PublishInStudy(study, None, case, caseName)
# Now we publish the rest of the objects
wall = case.getWall()
if wall is None:
pass
else:
wallSObj = self.PublishInStudy(study, caseSObj, wall,
wall.getName())
case.setWallEntry(wallSObj.GetID())
target = case.getTarget()
targetSObj = self.PublishInStudy(study, caseSObj, target,
target.getName())
case.setTargetEntry(targetSObj.GetID())
shadow = case.getShadow()
shadowSObj = self.PublishInStudy(study, caseSObj, shadow,
shadow.getName())
case.setShadowEntry(shadowSObj.GetID())
hdsgen = case.getHDSGEN()
hdsgenSObj = self.PublishInStudy(study, caseSObj, hdsgen,
hdsgen.getName())
case.setHdsgenEntry(hdsgenSObj.GetID())
powcal = case.getPOWCAL()
powcalSObj = self.PublishInStudy(study, caseSObj, powcal,
powcal.getName())
case.setPowcalEntry(powcalSObj.GetID())
magtfm = case.getMAGTFM()
if magtfm is not None:
magtfmSObj = self.PublishInStudy(study, caseSObj, magtfm,
magtfm.getName())
case.setMAGTFMEntry(magtfmSObj.GetID())
eqdsk = case.getEQDSK()
eqdskName = eqdsk.getName().rsplit('/', 1)[-1] # Basename
eqdskSObj = self.PublishInStudy(study, caseSObj, eqdsk, eqdskName)
case.setEqdskEntry(eqdskSObj.GetID())
def PublishInStudy(self, study, sobject, object, name):
"""Publishes object to study
"""
if not object:
# Nothing to do
return sobject
builder = study.NewBuilder()
# Initiate command transaction
builder.NewCommand()
if not sobject:
father = findOrCreateComponent()
newSobject = builder.NewObject(father)
else:
# Create a child object under SObject
newSobject = builder.NewObject(sobject)
attr = builder.FindOrCreateAttribute(newSobject, 'AttributeName')
if not name:
name = object.getName()
attr.SetValue(name)
attr = builder.FindOrCreateAttribute(newSobject, 'AttributeIOR')
attr.SetValue(ObjectToString(object))
attr = builder.FindOrCreateAttribute(newSobject, 'AttributeLocalID')
pixmap = builder.FindOrCreateAttribute(newSobject, 'AttributePixMap')
if object._narrow(CASE):
attr.SetValue(caseID())
pixmap.SetPixMap('case_right_icon.png')
elif object._narrow(GEOQ):
attr.SetValue(geoqID())
pixmap.SetPixMap('shadow_icon.png')
elif object._narrow(HDSGEN):
attr.SetValue(hdsgenID())
pixmap.SetPixMap('hdsgen_icon.png')
elif object._narrow(POWCAL):
attr.SetValue(powcalID())
pixmap.SetPixMap('powcal_icon.png')
elif object._narrow(EQDSK):
attr.SetValue(eqdskID())
pixmap.SetPixMap('eqdsk_icon.png')
elif object._narrow(MAGTFM):
attr.SetValue(magtfmID())
pixmap.SetPixMap('magtfm_icon.png')
else:
attr.SetValue(unknownID())
# End and start the transaction
builder.CommitCommand()
return newSobject
def Save(self, component, URL, isMultiFile):
"""Saves data in the form of a dictionary which is then transformed
into JSON.
"""
logger.debug('SMITER.Save Started')
data2Save = {}
study = getStudy()
itr = study.NewChildIterator(component)
itr.InitEx(1)
while itr.More():
so = itr.Value()
soId = so.GetID()
itr.Next()
obj = so.GetObject()
if obj:
if obj._narrow(SMITER_ORB__POA.GEOQ):
TYPE = 'Geoq'
elif obj._narrow(SMITER_ORB__POA.HDSGEN):
TYPE = 'Hdsgen'
elif obj._narrow(SMITER_ORB__POA.POWCAL):
TYPE = 'Powcal'
elif obj._narrow(SMITER_ORB__POA.EQDSK):
TYPE = 'Eqdsk'
elif obj._narrow(SMITER_ORB__POA.CASE):
TYPE = 'Case'
elif obj._narrow(SMITER_ORB__POA.MAGTFM):
TYPE = 'Magtfm'
else:
continue
if TYPE in 'GeoqHdsgenPowcalMagtfm':
if obj.modified():
paramList = obj.returnListOfChangedVariables()
data = {key: obj.getVariable(key) for key in paramList}
else:
data = {}
if TYPE == 'Geoq':
data['geoqType'] = obj.getType()
data['meshEntry'] = obj.getMeshEntry()
if TYPE == 'Magtfm':
data['mag.txt'] = obj.getMAG()
elif TYPE == 'Eqdsk':
# Object is Eqdsk which is different
data = obj.generateText()
elif TYPE == 'Case':
# Gather the necessary entry regarding child objects
entries = [obj.getWallEntry(), obj.getTargetEntry(),
obj.getShadowEntry(), obj.getHDSGENEntry(),
obj.getPOWCALEntry(), obj.getEQDSKEntry(),
obj.getMAGTFMEntry()]
data = [entries]
else:
continue
data2Save[soId] = {
'type': TYPE,
'name': obj.getName(),
'data': data
}
stream = json.dumps(data2Save)
logger.debug('SMITER.Save Finished')
return stream.encode()
def Load(self, component, stream, theURL, isMultiFile):
logger.debug('SMITER.Load Started')
"""Reads the dictionary we saved
"""
__entryToIORMap__.clear()
if isMultiFile:
pass
try:
unpack = json_loads(stream.decode())
except Exception as e:
return False
# First traverse and get the case objects
cases = []
for ID in unpack:
TYPE = unpack[ID]['type']
if TYPE == 'Case':
obj = CASE()
obj.setName(unpack[ID]['name'])
if isinstance(unpack[ID]['data'][-1], dict):
results = unpack[ID]['data'][1]
if results:
for result in results:
obj.saveResult(result, results[result])
cases.append((obj, unpack[ID]['data'][0]))
else:
cases.append((obj, unpack[ID]['data'][0]))
pointer = obj._this()
__entryToIORMap__[ID] = ObjectToString(pointer)
for ID in unpack:
# ID is the SObject entry
TYPE = unpack[ID]['type']
if TYPE == 'Geoq':
obj = GEOQ()
elif TYPE == 'Hdsgen':
obj = HDSGEN()
elif TYPE == 'Powcal':
obj = POWCAL()
elif TYPE == 'Eqdsk':
obj = EQDSK()
elif TYPE == 'Magtfm':
obj = MAGTFM()
else:
continue
# Check association in a case object
for smiterCase, entries in cases:
if ID in entries:
if TYPE == 'Geoq':
geoqType = unpack[ID]['data']['geoqType']
if geoqType == 'target':
smiterCase.setTarget(obj._this())
smiterCase.setTargetEntry(ID)
elif geoqType == 'shadow':
smiterCase.setShadow(obj._this())
smiterCase.setShadowEntry(ID)
elif geoqType == 'wall':
smiterCase.setWall(obj._this())
smiterCase.setWallEntry(ID)
else:
continue
elif TYPE == 'Hdsgen':
smiterCase.setHDSGEN(obj._this())
smiterCase.setHdsgenEntry(ID)
elif TYPE == 'Powcal':
smiterCase.setPOWCAL(obj._this())
smiterCase.setPowcalEntry(ID)
elif TYPE == 'Eqdsk':
smiterCase.setEQDSK(obj._this())
smiterCase.setEqdskEntry(ID)
elif TYPE == 'Magtfm':
smiterCase.setMAGTFM(obj._this())
smiterCase.setMAGTFMEntry(ID)
else:
continue
# Set data and map SObject to CORBA instance of object.
obj.setName(unpack[ID]['name'])
if TYPE in 'GeoqHdsgenPowcalMagtfm':
# Its important to first set the GEOQ type, so that the
# proper DEFAULT VARIABLES are loaded, otherwise no variable
# can be set.
if TYPE == 'Geoq':
obj.setType(unpack[ID]['data']['geoqType'])
obj.setMeshEntry(unpack[ID]['data']['meshEntry'])
if TYPE == 'Magtfm':
obj.setMAG(unpack[ID]['data']['mag.txt'])
for key in unpack[ID]['data']:
if unpack[ID]['data'] == '':
obj.clearVariable(key)
else:
obj.setVariable(key, unpack[ID]['data'][key])
else:
# Use lazy set instead of reading the EQDSK data.
# obj.read(unpack[ID]['data'])
obj.setEqdskContent(unpack[ID]['data'])
# EQDSK.
# obj.read(unpack[ID]['data'])
pointer = obj._this()
__entryToIORMap__[ID] = ObjectToString(pointer)
logger.debug('SMITER.Load Finished')
return True
def Close(self, theComponent):
logger.debug('SMITER.Close: Called')
def LocalPersistentIDToIOR(self, theSObject, PersistentID, isMultiFile,
isASCII):
if PersistentID in __entryToIORMap__:
return __entryToIORMap__[PersistentID]
def DumpPython(self, isPublished, isMultiFile):
"""Dump module data to the Python script.
"""
abuffer = []
names = []
study = getStudy()
father = study.FindComponent(moduleName())
if father:
iter = study.NewChildIterator(father)
while iter.More():
name = iter.Value().GetName()
if name:
names.append(name)
iter.Next()
if names:
abuffer += ["from salome import lcc"]
abuffer += ["import SMITER_ORB"]
abuffer += [""]
abuffer += ["pyhello = lcc.FindOrLoadComponent( "
"'FactoryServerPy', '%s' )" % moduleName()]
abuffer += [""]
abuffer += ["pyhello.createObject( theStudy, '%s' )" % name
for name in names]
abuffer += [""]
if isMultiFile:
abuffer = [" " + s for s in abuffer]
abuffer[0:0] = ["def RebuildData( theStudy ):"]
abuffer += [" pass"]
abuffer += ["\0"]
res = "\n".join(abuffer)
return (res.encode(), 1)
class Base(object):
def __init__(self, name:str = ''):
self.order = []
self.params = {}
self.variables = {}
self.name = name
self._defaultValueModified = False
def clearVariable(self, name: str):
"""Because CORBA disallows passing empty string as parameters, this
is used instead. Strange though, retrieving empty string gives no
errors.
"""
for section in self.order:
if name in self.params[section]:
if self.params[section] == '':
if name in self.variables:
self.variables.pop(name)
if not len(self.variables):
self._defaultValueModified = False
return True
else:
self.variables[name] = ''
return False
def setVariable(self, name: str, val: str):
"""Sets the new value to variable name. If the value is the same as
the one in the default options, then the value is not changed or in
other words no actual change has been added.
"""
# Strip the value
val = val.strip()
if not val:
return False
for section in self.order:
if name in self.params[section]:
if val == self.params[section][name]:
if name in self.variables:
# Changing the user modified value back to default
# value.
self.variables.pop(name)
if not len(self.variables):
self._defaultValueModified = False
return True
else:
# The same value as the default one
return True
else:
self.variables[name] = val
self._defaultValueModified = True
return True
return False
def getVariable(self, name: str):
"""Returns the value of parameter. If the parameter is not in the
self.variables attribute, then we can try to get the default Value.
"""
if name not in self.variables:
for section in self.order:
if name in self.params[section]:
return self.params[section][name]
return ''
return self.variables[name]
def getName(self):
"""Returns the name.
Returns:
name (str): String which has whites-paces replaced with `_`.
"""
return self.name
def setName(self, name):
"""Set the name for object. Usually it should be the same as the name
of the Salome Object that references to the object.
"""
self.name = name
def returnListOfChangedVariables(self):
"""Returns a list of parameters that have non-default values.
"""
return [key for key in self.variables.keys()]
def createCTL(self):
"""Creates a string in the form of CTL.
Returns:
ctl (str): Parameters stored in the form of FORTRAN namelist.
"""
ctl = ''
for section in self.order:
ctl += '&%s\n' % section
for parameter in self.params[section]:
if parameter in self.variables:
value = self.variables[parameter]
if value:
if section in 'inputfilesmagfiles':
ctl += " %s = '%s',\n" % (parameter, value)
else:
ctl += ' %s = %s,\n' % (parameter, value)
ctl += '/\n'
return ctl
def modified(self):
"""Returns true if there are non-default values stored.
"""
return self._defaultValueModified
[docs]class POWCAL(SMITER_ORB__POA.POWCAL, Base):
"""Powcal interface implementation
"""
def __init__(self, name=''):
Base.__init__(self, name)
self.order = smardda_parameters.POWCAL_order
self.params = smardda_parameters.POWCAL_params
[docs]class HDSGEN(SMITER_ORB__POA.HDSGEN, Base):
"""HDSGEN interface implementation
"""
def __init__(self, name=''):
Base.__init__(self, name)
self.order = smardda_parameters.HDSGEN_order
self.params = smardda_parameters.HDSGEN_params
class MAGTFM(SMITER_ORB__POA.MAGTFM, Base):
def __init__(self, name=''):
Base.__init__(self, name)
self.order = smardda_parameters.MAGTFM_order
self.params = smardda_parameters.MAGTFM_params
self.mag = ''
def setMAG(self, mag):
"""Set the vacuum magnetic field data.
"""
self.mag = mag
def getMAG(self):
"""Get the vacuum magnetic field data.
"""
return self.mag
[docs]class GEOQ(SMITER_ORB__POA.GEOQ, Base):
"""GEOQ interface implementation.
Attributes:
name (string): Name for the geoq instance
type (string): Type of geoq instance. Either target or shadow.
variables (dict): A dictionary for non default values for parameters
_defaultValueModified (bool): Is true if nondefault values are present
meshVTK (string): VTK format of mesh. It is generated on instance
"""
def __init__(self, name='', type=''):
Base.__init__(self, name)
self.order = smardda_parameters.GEOQ_order
self.params = smardda_parameters.GEOQ_params
self.type = type
self.meshEntry = ''
if type:
self.setType(type)
[docs] def setMeshEntry(self, entry):
"""If used with SALOME study, then the entry is the SObject from
Object browser that represents the mesh we wish to use for GEOQ.
"""
self.meshEntry = entry
[docs] def getMeshEntry(self):
"""Returns the entry of the Salome object that references to the
GEOQ object.
"""
return self.meshEntry
[docs] def setType(self, type):
"""Refers to the type to be used when running SMARDA case.
Target means that the mesh will be later used for power calculation.
Shadow means that it will be used by hdsgen and powcal.
"""
self.type = type
[docs] def getType(self):
"""Returns the type of GEOQ.
"""
return self.type
[docs]class CASE(SMITER_ORB__POA.CASE):
"""Case object for running SMARDDA. It contains references to all input
objects for easier use. Accordingly there are set and get functions for
each object holding the input data.
The function ``setInputParameters`` sets the parameters of the control CTL
files, which requires the input or output data from other input objects.
Attributes:
wall (:py:mod:`GEOQ`): GEOQ object that contains parameters for wall
wallEntry (str): Entry for wall GEOQ Salome Object. If no Salome Object
is present then an empty string is placed in.
target (:py:mod:`GEOQ`): GEOQ object that contains parameters for
target
targetEntry (str): Entry for target GEOQ Salome Object. If no Salome
object is present then an empty string is placed in.
shadow (:py:mod:`GEOQ`): GEOQ object that contains parameters for
shadow
shadowEntry (str):Entry for shadow GEOQ Salome Object. If no Salome
Object is present then an empty string is placed in.
hdsgen (:py:mod:`HDSGEN`): HDSGEN object, HDS means Hierarchical Data
Structure
hdsgenEntry (str): Entry for target HDSGEN Salome Object. If no Salome
object is present then an empty string is placed in.
powcal (:py:mod:`POWCAL`): POWCAL object
powcalEntry (str): Entry for target POWCAL Salome Object. If no Salome
object is present then an empty string is placed in.
eqdsk (:py:mod:`EQDSK`): EQDSK object, that contains information about
equilibrium.
eqdskEntry (str): Entry for target EQDSK Salome Object. If no Salome
object is present then an empty string is placed in.
"""
def __init__(self, caseName=''):
# self.geoqTarget = geoq.GEOQ() # Empty class
# self.geoqShadow = geoq.GEOQ()
# self.hdsgen = hdsgen.HDSGEN()
# self.powcal = powcal.POWCAL()
# self.eqdsk = eqdsk.EQDSK()
self.caseName = caseName
self.wall = None
self.wallEntry = ''
self.target = None
self.targetEntry = ''
self.shadow = None
self.shadowEntry = ''
self.hdsgen = None
self.hdsgenEntry = ''
self.powcal = None
self.powcalEntry = ''
self.magtfm = None
self.magtfmEntry = ''
self.eqdsk = None
self.eqdskEntry = ''
[docs] def setName(self, caseName):
"""Set a name for the case. It should be the same as the name displayed
to the **Salome Object** in **Object browser**.
Arguments:
caseName (str): Name for the case.
"""
self.caseName = caseName
[docs] def getName(self):
"""Returns the name.
Returns:
name (str): String which has whites-paces replaced with `_`.
"""
return self.caseName.replace(' ', '_')
[docs] def setTarget(self, target):
"""Sets the target :py:mod:`GEOQ` object.
Arguments:
target (:py:mod:`GEOQ`): GEOQ object for target
"""
self.target = target
def setTargetEntry(self, entry):
self.targetEntry = entry
[docs] def getTarget(self):
"""Returns the :py:mod:`GEOQ` object of the target.
Returns:
target (:py:mod:`GEOQ`): GEOQ object for the target
"""
return self.target
[docs] def getTargetEntry(self):
"""Returns the *Salome Object* entry referencing to the target
"py:mod:`GEOQ` object.
Returns:
targetEntry (str): Entry for the *Salome Object*.
"""
return self.targetEntry
[docs] def setWall(self, wall):
"""Sets the wall :py:mod:`GEOQ` object.
Arguments:
wall (:py:mod:`GEOQ`): GEOQ object for wall
"""
self.wall = wall
def setWallEntry(self, entry):
self.wallEntry = entry
[docs] def getWall(self):
"""Returns the :py:mod:`GEOQ` object of the wall.
Returns:
wall (:py:mod:`GEOQ`): GEOQ object for the wall.
"""
return self.wall
[docs] def getWallEntry(self):
"""Returns the *Salome Object* entry referencing to the wall
"py:mod:`GEOQ` object.
Returns:
wallEntry (str): Entry for the *Salome Object*.
"""
return self.wallEntry
[docs] def setShadow(self, shadow, entry=''):
"""Sets the shadow :py:mod:`GEOQ` object.
Arguments:
shadow (:py:mod:`GEOQ`): GEOQ object for shadow.
"""
self.shadow = shadow
def setShadowEntry(self, entry):
self.shadowEntry = entry
[docs] def getShadow(self):
"""Returns the :py:mod:`GEOQ` object of the shadow.
Returns:
shadow (:py:mod:`GEOQ`): GEOQ object for the shadow.
"""
return self.shadow
[docs] def getShadowEntry(self):
"""Returns the *Salome Object* entry referencing to the shadow
"py:mod:`GEOQ` object.
Returns:
shadowEntry (str): Entry for the *Salome Object*.
"""
return self.shadowEntry
[docs] def setHDSGEN(self, hdsgen, entry=''):
"""Sets the hdsgen :py:mod:`HDSGEN` object.
Arguments:
hdsgen (:py:mod:`HDSGEN`): HDSGEN object.
"""
self.hdsgen = hdsgen
def setHdsgenEntry(self, entry):
self.hdsgenEntry = entry
[docs] def getHDSGEN(self):
"""Returns the :py:mod:`HDSGEN` object of the hdsgen.
Returns:
hdsgen (:py:mod:`HDSGEN`): HDSGEN object for the hdsgen.
"""
return self.hdsgen
[docs] def getHDSGENEntry(self):
"""Returns the *Salome Object* entry referencing to the hdsgen
"py:mod:`HDSGEN` object.
Returns:
hdsgenEntry (str): Entry for the *Salome Object*.
"""
return self.hdsgenEntry
[docs] def setPOWCAL(self, powcal, entry=''):
"""Sets the powcal :py:mod:`POWCAL` object.
Arguments:
powcal (:py:mod:`POWCAL`): POWCAL object.
"""
self.powcal = powcal
def setPowcalEntry(self, entry):
self.powcalEntry = entry
[docs] def getPOWCAL(self):
"""Returns the :py:mod:`POWCAL` object of the powcal.
Returns:
powcal (:py:mod:`POWCAL`): POWCAL object for the powcal.
"""
return self.powcal
[docs] def getPOWCALEntry(self):
"""Returns the *Salome Object* entry referencing to the powcal
"py:mod:`POWCAL` object.
Returns:
powcalEntry (str): Entry for the *Salome Object*.
"""
return self.powcalEntry
[docs] def setMAGTFM(self, magtfm):
"""Sets the magtfm :py:mod:`MAGTFM` object.
Arguments:
magtfm (:py:mod:`MAGTFM`): MAGTFM object.
"""
self.magtfm = magtfm
def setMAGTFMEntry(self, entry):
self.magtfmEntry = entry
def getMAGTFM(self):
return self.magtfm
def getMAGTFMEntry(self):
return self.magtfmEntry
[docs] def setEQDSK(self, equilibrium, entry=''):
"""Sets the eqdsk :py:mod:`EQDSK` object.
Arguments:
eqdsk (:py:mod:`EQDSK`): EQDSK object for shadow.
"""
self.eqdsk = equilibrium
def setEqdskEntry(self, entry):
self.eqdskEntry = entry
[docs] def getEQDSK(self):
"""Returns the :py:mod:`EQDSK` object of the eqdsk.
Returns:
eqdsk (:py:mod:`EQDSK`): EQDSK object for the eqdsk.
"""
return self.eqdsk
[docs] def getEQDSKEntry(self):
"""Returns the *Salome Object* entry referencing to the eqdsk
"py:mod:`EQDSK` object.
Returns:
eqdskEntry (str): Entry for the *Salome Object*.
"""
return self.eqdskEntry
[docs] def generateSaveString(self) -> str:
"""Create a dictionary containing all information of the case and then
output it as a string.
"""
out = {}
if self.wall is not None:
# Wall GEOQ parameters
out['wall'] = {param: self.wall.getVariable(param) for param in
self.wall.returnListOfChangedVariables()}
if self.target is not None:
# Target GEOQ parameters
out['target'] = {param: self.target.getVariable(param) for param in
self.target.returnListOfChangedVariables()}
if self.shadow is not None:
out['shadow'] = {param: self.shadow.getVariable(param) for param in
self.shadow.returnListOfChangedVariables()}
if self.hdsgen is not None:
out['hds'] = {param: self.hdsgen.getVariable(param) for param in
self.hdsgen.returnListOfChangedVariables()}
if self.powcal is not None:
out['powcal'] = {param: self.hdsgen.getVariable(param) for param in
self.hdsgen.returnListOfChangedVariables()}
# Eqdsk data
if self.eqdsk is not None:
out['eqdsk'] = self.eqdsk.generateText()
# MAG data
if self.mag is not None:
out['mag'] = self.mag.getMAG()
try:
jsonOut = json.dumps(out)
except Exception as e:
print(e)
print('Failed to create JSON string from CASE data dictionary.')
return ''
return jsonOut
[docs] def loadString(self, jsonString: str) -> bool:
"""Take a json string containing dictionary with all paremeters for
the case.
"""
try:
inp = json.loads(jsonString)
except Exception as e:
print(e)
print('Failed to load the string with JSON.')
return False
if 'wall' in inp:
self.wall = GEOQ()
return True
"""
IMPORTANT!
Since the EQDSK G format is no longer a consistent standard (deviation,
comments, non-fixed format), this utility is only used for saving the contents
of the loaded EQDSK file and saving to a file whenever a SMITER case is run.
The parsing is still done for plotting purposes or for just debugging of the
format, i.e., to see if the EQDSK file conforms to the EQDSK G format.
"""
# Wrapper function that checks if an EQDSK instance needs to parse the data,
# when a request for a quantity happens.
def readEqdskContentsIfRequired(f):
def wrapper(*args):
instance = args[0]
if not instance.hasItBeenRead:
instance.read(instance.eqdskString)
return f(*args)
return wrapper
[docs]class EQDSK(SMITER_ORB__POA.EQDSK):
"""EQDSK interface implementation
The following quantities are read from a ``EQDSK G``.
Attributes:
PSIRZ (arr): Array of plasma psi values in units WEBER.
CURRENT (float): Plasma current in Ampere
RDIM (float): Horizontal dimension in meter of computational box
ZDIM (float): Vertical dimension in meter of computational box
NW (int): Number of horizontal R grid points
NH (int): Number of vertical Z grid points
LIMITR (int): Number of limiter points
RLIM (arr): R of surrounding limiter contour in meter
ZLIM (arr): Z of surrounding limiter contour in meter
NBBBS (int): Number of boundary points
RBBBS (arr): R of boundary points in meter
ZBBBS (arr): Z of boundary points in meter
RMAXIS (arr): R of magnetic axis in meter
ZMAXIS (arr): Z of magnetic axis in meter
FPOL (arr): Poloidal current function in m-T,
F=RB_T on flux grid
PRES (arr): Plasma pressure in nt/m^2 on uniform flux grid
FFPRIM (arr): FF'(Psi) in (mT)^2/(Weber /rad) on uniform flux grid
PPRIME (arr): P'(Psi) in (nt/m^2) / (Weber /rad) on uniform flux grid
QPSI (arr): q values on uniform flux grid from axis to boundary
SIMAG (arr): poloidal flux at magnetic axis in Weber /rad
SIBRY (arr): poloidal flux at the plasma boundary in Weber
/rad
RCENTR (float): R in meter of vacuum toroidal magnetic field BCENTR
BCENTR (float): Vacuum toroidal magnetic field in Tesla at RCENTR
RLEFT (float): Minimum R in meter of rectangular computational box
ZMID (float): Z of center of computational box in meter
.. note::
The quantity FPOL is poloidal current function in m-T (r Bt). It's
domain is the flux region from SIMAG to SIBRY.
They are all accessible with the following example
.. code-block:: python
import eqdsk
x = eqdsk.EQDSK()
eqdskFile = '/path/to/eqdsk/format/file'
# If SMITER is located on your machine
x.openAndRead(eqdskFile)
# If SMITER is used remotely you have to manually pass the contents
# of the EQDSK file. In this case:
with open(eqdskFile) as f:
data = f.read()
ok = x.read(data)
# Then set the name of the eqdsk file
if ok:
# Reading Successful
eqdskName = eqdskFile.rpslit('/', 1)[-1] # Get the name of the file
x.setName(eqdskFile)
else:
print('Trouble reading EQDSK file %s' % eqdskFile)
# If the file was read without a problem, you can access, i.e, the
# geometry of the limiter
x.RLIM
x.ZLIM
"""
# Pattern for reading integer values separated with spaces
integerPattern = u'(?:^|(?<=\s))-?[0-9]+(?:$|(?=\s))'
intPatt = re.compile('.*' + integerPattern + '.*')
# Pattern for reading scientific numbers
scientificPattern = u'[+-]?[0-9]\.[0-9]{6,9}[eE][+-][0-9]{2}'
# Pattern for reading both integers and scientific numbers.
genPatt = re.compile(u'(?:%s|%s)' % (integerPattern, scientificPattern))
def __init__(self, name=''):
self.name = name
self.logMessage = ''
self.resetValues()
def resetValues(self):
self.INPUT_EQDSK = ''
self.HEADER = ''
self.PSIRZ = [] # Plasma psi WEBER
self.CURRENT = None
self.RDIM = None # Size dimension for R and Z
self.ZDIM = None
self.NW = None # Dimension of grid
self.NH = None
self.LIMITR = None # Number of limiter points
self.RLIM = [] # rlim
self.ZLIM = [] # zlim
self.NBBBS = None # Number of Boundary points
self.RBBBS = [] # rbbbs
self.ZBBBS = [] # zbbs
self.RMAXIS = [] # Position of toroidal magnetic
self.ZMAXIS = [] # field
self.FPOL = []
self.PRES = []
self.FFPRIM = []
self.PPRIME = []
self.QPSI = []
self.SIMAG = None # Flux value at magnetix axis
self.SIBRY = None # Flux value on boundary
self.RCENTR = None # Reference value: position R, mag field B
self.BCENTR = None # B value in RCENTR
self.RLEFT = None # Minimum of rectangular computational box.
self.ZMID = None # Vertical dimension of computational box
"""
Or in another case:
The grid goes from :
RLEFT to RLEFT + RDIM
-ZDIM / 2 to ZDIM / 2
"""
self.eqdskString = ''
self.hasItBeenRead = False
self.successfullRead = False
def getName(self):
"""Returns the name.
Returns:
name (str): String which has whites-paces replaced with `_`.
"""
return self.name.replace(' ', '_')
def setName(self, name):
"""Set a name for eqdsk.. It should be the same as the name displayed
to the **Salome Object** in **Object browser**.
Arguments:
eqdskName (str): Name for the case.
"""
self.name = name
def openAndRead(self, filePath):
if not os.access(filePath, os.F_OK | os.R_OK):
print('File does not exist or lacking permission to read it!')
return False
data = ''
with open(filePath, 'r') as f:
data = f.read()
# Setting name
self.setName(filePath.split('/')[-1])
return self.read(data)
def logValue(self, msg, value, t='f'):
if t == 'd':
formatedMsg ='{0:>30} %d'.format(msg + ':') % value
else:
formatedMsg ='{0:>30} %f'.format(msg + ':') % value
self.logMessage += formatedMsg + '\n'
def log(self, msg):
"""Function used to save the parsing information of the EQDSK file.
"""
self.logMessage += msg + '\n'
@readEqdskContentsIfRequired
def getLog(self):
return self.logMessage
def setEqdskContent(self, eqdskString):
"""Set the full contents of an EQDSK file, but do not read it!
"""
self.eqdskString = eqdskString
def getEqdskContent(self):
"""Return the full contents of an EQDSK file.
"""
return self.eqdskString
def read(self, eqdskString):
"""Passing string instead of a filename, to avoid any troubles if this
is used on a server.
"""
self.resetValues()
self.eqdskString = eqdskString
# Reset the log
self.logMessage = 'Reading eqdsk file %s\n' % self.name
LINES = eqdskString.splitlines()
HEADER = LINES[0]
self.HEADER = HEADER
if len(self.HEADER) > 79:
self.logMessage += 'First line is longer than 79 characters!\n'
# regexHeader = '^.+[0-9] +?([0-9]+) +?([0-9]+)'
regexHeader = '\s[0-9]\s+([0-9]+)\s+([0-9]+)'
result = re.findall(regexHeader, HEADER)
# self.log("Reading 1st line of EQDSK.")
if result:
nw, nh = result[0]
self.NW = int(nw)
self.NH = int(nh)
self.logValue('Width', self.NW, t='d')
self.logValue('Height', self.NH, t='d')
else:
self.log("Couldn't read the dimensions from the 1st line.")
self.log("LINE:\n%s" % HEADER)
return False
# self.log('Done reading 1st line.')
# self.log("Reading 2nd line of EQDSK")
i = 1
result = self.readOneLine(LINES[i])
if result == -1:
# self.log("[INFO] 2nd line must be a comment")
while 1:
# self.log("[INFO] Filtering the comments.")
i += 1
result = self.readOneLine(LINES[i])
if result != -1:
break
# Precaution, if there are 100 wrong lines, just end the
# reading
if i > 100:
return False
try:
self.RDIM = result[0]
self.ZDIM = result[1]
self.RCENTR = result[2]
self.RLEFT = result[3]
self.ZMID = result[4]
self.logValue('Dimension R', self.RDIM)
self.logValue('Dimension Z', self.ZDIM)
self.logValue('Center R', self.RCENTR)
self.logValue('Left R', self.RLEFT)
self.logValue('Middle Z', self.ZMID)
# self.log('Done reading 2nd line.')
# self.log("Reading 3rd line of EQDSK")
i += 1
result = self.readOneLine(LINES[i])
self.RMAXIS = result[0]
self.ZMAXIS = result[1]
self.SIMAG = result[2]
self.SIBRY = result[3]
self.BCENTR = result[4]
self.logValue('Magnetic axis position R',
self.RMAXIS)
self.logValue('Magnetic axis position Z',
self.ZMAXIS)
self.logValue('Flux at magnetic axis', self.SIMAG)
self.logValue('Flux at boundary', self.SIBRY)
self.logValue('Center B', self.BCENTR)
# self.log("Done reading 3rd line.")
i += 1
# self.log('Reading 4th line of EQDSK')
result = self.readOneLine(LINES[i])
self.CURRENT = result[0]
self.logValue('Current', self.CURRENT)
msg = "The 2nd value on line 4 of the EQDSK is not equal to " \
"the flux at magnetic axis read from the 3rd line"
assert self.SIMAG == result[1], msg
DUMMY = result[2]
msg = "The 4th value on line 4 of the EQDSK is not equal to the"\
" magnetic axis R position, read from the 3rd line!"
assert self.RMAXIS == result[3], msg
DUMMY = result[4]
# self.log("Done reading 4th line.")
i += 1
# self.log("Reading 5th line of EQDSK")
result = self.readOneLine(LINES[i])
msg = "The 1st value on line 5 is not equal to the magnetic " \
" axis Z position, read from the 3rd line!"
assert self.ZMAXIS == result[0], msg
DUMMY = result[1]
msg = " The 3rd value on line 5 is not equal to the flux at " \
" boundary, read from the 3rd line"
assert self.SIBRY == result[2], msg
DUMMY = result[3]
DUMMY = result[4]
# self.log("Done reading 5th line.")
line_index = i + 1
self.log("Reading values from 5th line on...")
P1, P2 = self.readValues(LINES[line_index:])
N_VALUES = len(P1) + len(P2)
TOTAL_VALUES = 5 * self.NW + self.NH * self.NW + 2
_I = 0
self.FPOL = P1[_I: _I + self.NW]
_I += self.NW
self.PRES = P1[_I: _I + self.NW]
_I += self.NW
self.FFPRIM = P1[_I: _I + self.NW]
_I += self.NW
self.PPRIME = P1[_I: _I + self.NW]
_I += self.NW
self.PSIRZ = []
for i in range(self.NH):
self.PSIRZ.append(P1[_I: _I + self.NW])
_I += self.NW
self.QPSI = P1[_I: _I + self.NW]
_I += self.NW
_I = 0
self.NBBBS = int(P2[_I])
_I += 1
self.LIMITR = int(P2[_I])
_I += 1
self.logValue('N boundary', self.NBBBS, t='d')
self.logValue('N limiter', self.LIMITR, t='d')
self.log("Read %d values." % N_VALUES)
TOTAL_VALUES += (self.NBBBS + self.LIMITR) * 2
self.log('%d actual values required according to EQDSK G format.' %
TOTAL_VALUES)
self.RBBBS = []
self.ZBBBS = []
for i in range(self.NBBBS):
self.RBBBS.append(P2[_I])
self.ZBBBS.append(P2[_I + 1])
_I += 2
self.RLIM = []
self.ZLIM = []
for i in range(self.LIMITR):
self.RLIM.append(P2[_I])
self.ZLIM.append(P2[_I + 1])
_I += 2
except Exception as e:
print(e)
self.successfullRead = False
return False
self.successfullRead = True
return True
def readOneLine(self, line):
"""Reads the float values from one line.
"""
pattern = '-?[0-9]{1}\.[0-9]{9}[eE][-+]?[0-9]{2}'
result = re.findall(pattern, line)
if line.lstrip()[0].isalpha():
return -1
if len(result) == 5:
# 5 float values on line.
out = [float(el) for el in result]
return out
else:
# Manually check the values
out = []
for el in line.split():
try:
out.append(float(el))
except Exception as e:
pass
if len(out) == 5:
# 5 values. All ok
return out
else:
# Must be a comment line
return -1
def readValues(self, LINES):
"""Reads values from EQDSK lines.
"""
# Part one consists of values before the section of limiter and
# boundary geometry. Basically meaning reading values until finding
# two integer values.
part_1 = []
# Part two are all values regarding plasma and boundary geometry.
# HOPEFULLY, the lines start with two INTEGERS and not somewhere in
# between.
part_2 = []
# # Pattern for reading integer values separated with spaces
# integerPattern = u'(?:^|(?<=\s))-?[0-9]+(?:$|(?=\s))'
# intPatt = re.compile('.*' + integerPattern + '.*')
# # Pattern for reading scientific numbers
# scientificPattern = u'[+-]?[0-9]\.[0-9]{6,9}[eE][+-][0-9]{2}'
# pattern = u'(?:%s|%s)' % (integerPattern, scientificPattern)
# genPatt = re.compile(pattern)
SWITCH = 1
for line in LINES:
if SWITCH and self.intPatt.match(line):
SWITCH = 0
result = self.genPatt.findall(line)
if SWITCH:
part_1 += [float(el) for el in result]
else:
part_2 += [float(el) for el in result]
return part_1, part_2
def readOk(self):
return self.successfullRead
def generateText(self):
"""Generates an EQDSK format text.
"""
# return self.eqdskString
# The problem with EQDSK G format is that is no longer a consistent
# standard. Therefore the utility eqdsk.py is mainly used for reading
# the value, maybe validating the format and using the read value
# to plot data. Therefore when we want to save the contents of the
# eqdskString, the eqdsk is no longer generated by the EQDSK G format
# but the contents of the loaded eqdsk file is just printed.
#---------------------------------------------------------------------
if not self.hasItBeenRead:
self.read(self.eqdskString)
if not self.successfullRead:
return self.eqdskString
self.generatedText = ''
self.generatedText += self.HEADER + '\n'
# First line
DUMMY = 0.0
self._PerLineCount = 1
self.createRealLine(self.RDIM, self.ZDIM, self.RCENTR,
self.RLEFT, self.ZMID)
self.createRealLine(self.RMAXIS,
self.ZMAXIS,
self.SIMAG, self.SIBRY,
self.BCENTR)
self.createRealLine(self.CURRENT, self.SIMAG, DUMMY,
self.RMAXIS, DUMMY)
self.createRealLine(self.ZMAXIS, DUMMY,
self.SIBRY, DUMMY, DUMMY)
self.createRealLine(*self.FPOL)
if not self.generatedText.endswith('\n'):
self.generatedText += '\n'
self._PerLineCount = 1
self.createRealLine(*self.PRES)
if not self.generatedText.endswith('\n'):
self.generatedText += '\n'
self._PerLineCount = 1
self.createRealLine(*self.FFPRIM)
if not self.generatedText.endswith('\n'):
self.generatedText += '\n'
self._PerLineCount = 1
self.createRealLine(*self.PPRIME)
if not self.generatedText.endswith('\n'):
self.generatedText += '\n'
self._PerLineCount = 1
for psi_width in self.PSIRZ:
self.createRealLine(*psi_width)
if not self.generatedText.endswith('\n'):
self.generatedText += '\n'
self._PerLineCount = 1
self.createRealLine(*self.QPSI)
if not self.generatedText.endswith('\n'):
self.generatedText += '\n'
self.createIntLine(self.NBBBS, self.LIMITR)
self.generatedText += '\n'
self._PerLineCount = 1
alternating_array = [None] * 2 * self.NBBBS
alternating_array[::2] = self.RBBBS
alternating_array[1::2] = self.ZBBBS
self.createRealLine(*alternating_array)
self.generatedText += '\n'
self._PerLineCount = 1
if self.LIMITR:
alternating_array = [None] * 2 * self.LIMITR
alternating_array[::2] = self.RLIM
alternating_array[1::2] = self.ZLIM
self.createRealLine(*alternating_array)
if not self.generatedText.endswith('\n'):
self.generatedText += '\n'
return self.generatedText
def createRealLine(self, *args):
Format = '%.9E'
for el in args:
stringReal = Format % el
self.generatedText += ' ' * (16 - len(stringReal)) + stringReal
if self._PerLineCount % 5 == 0:
self.generatedText += '\n'
self._PerLineCount += 1
def createIntLine(self, *args):
for el in args:
self.generatedText += ' ' + str(el)
def mmMoveInR(self, mm):
""" Radially moves the values of eqdsk for *mm* millimeters.
All R relevant quantities:
RLIM
RBBBS
RLEFT
RMAXIS
Note that these quantities are not necessarily in millimeters!
"""
if not self.successfullRead:
return
orderLim = '%.1E' % max(self.RLIM)
orderLim = int(orderLim[-2:])
scale = 1.0
if orderLim == 0:
# Meters
scale = 1e-3
elif orderLim == 2:
# Centimeters
scale = 1e-2
elif orderLim == 3:
# Millimeters
scale = 1
else:
# print('Strange units as RLIM!')
return
for i in range(len(self.RLIM)):
self.RLIM[i] += scale * mm
orderBoundary = '%.1E' % max(self.RBBBS)
orderBoundary = int(orderBoundary[-2:])
scale = 1.0
if orderBoundary == 0:
scale = 1e-3
elif orderBoundary == 2:
scale = 1e-2
elif orderBoundary == 3:
scale = 1
else:
# print('Strange Units at RBBBS!')
return
for i in range(len(self.RBBBS)):
self.RBBBS[i] += scale * mm
orderLeftR = '%.1E' % self.RLEFT
orderLeftR = int(orderLeftR[-2:])
scale = 1.0
if orderLeftR == 0:
scale = 1e-3
elif orderLeftR == 2:
scale = 1e-2
elif orderLeftR == 3:
scale = 1
else:
# print('Strange Units at RLEFT!')
return
self.RLEFT += scale * mm
orderMagAxisR = '%.1E' % self.RLEFT
orderMagAxisR = int(orderMagAxisR[-2:])
scale = 1.0
if orderMagAxisR == 0:
scale = 1e-3
elif orderMagAxisR == 2:
scale = 1e-2
elif orderMagAxisR == 3:
scale = 1
else:
# print('Strange Units at RMAXIS!')
return
self.RMAXIS += scale * mm
def fluxOffset(self, offset):
"""Offsetting the flux quantities.
SIMAG += offset
SIBRY = offset
PSIRZ += offset
"""
if not self.successfullRead:
return
self.SIMAG += offset
self.SIBRY = offset
X = len(self.PSIRZ)
Y = len(self.PSIRZ[0])
for i in range(X):
for j in range(Y):
self.PSIRZ[i][j] += offset
def convertTo_m(self, a):
"""Determine the size unit. If the units are in mm, divide the
values by 1000.
Arguments:
a (array): Array of floats holding the R or Z values.
"""
order = int(math.log10(max([abs(el) for el in a])))
if order == 2:
# Hopefully centimeters
return [el / 100.0 for el in a]
elif order == 3:
# Hopefully mm
return [el / 1000.0 for el in a]
else:
return a
@readEqdskContentsIfRequired
def getRBBBS(self):
"""Gets the R points for LCFS.
"""
return self.convertTo_m(self.RBBBS)
@readEqdskContentsIfRequired
def getZBBBS(self):
"""Gets the Z points for LCFS.
Determine the size unit. If the units are in meters, multiply the
values by 1000.
"""
return self.convertTo_m(self.ZBBBS)
@readEqdskContentsIfRequired
def getRLIM(self):
"""Gets the R points for wall silhouette.
"""
return self.convertTo_m(self.RLIM)
@readEqdskContentsIfRequired
def getZLIM(self):
"""Gets the Z points for wall silhouette.
"""
return self.convertTo_m(self.ZLIM)
@readEqdskContentsIfRequired
def getCURRENT(self):
"""Returns plasma current in Ampere
"""
return self.CURRENT
@readEqdskContentsIfRequired
def getHEADER(self):
return self.HEADER
@readEqdskContentsIfRequired
def getPSIRZ(self):
return self.PSIRZ
@readEqdskContentsIfRequired
def getRDIM(self):
return self.RDIM
@readEqdskContentsIfRequired
def getZDIM(self):
return self.ZDIM
@readEqdskContentsIfRequired
def getNW(self):
return self.NW
@readEqdskContentsIfRequired
def getNH(self):
return self.NH
@readEqdskContentsIfRequired
def getLIMITR(self):
return self.LIMITR
@readEqdskContentsIfRequired
def getNBBBS(self):
return self.NBBBS
@readEqdskContentsIfRequired
def getRMAXIS(self):
return self.RMAXIS
@readEqdskContentsIfRequired
def getZMAXIS(self):
return self.ZMAXIS
@readEqdskContentsIfRequired
def getFPOL(self):
return self.FPOL
@readEqdskContentsIfRequired
def getPRES(self):
return self.PRES
@readEqdskContentsIfRequired
def getFFPRIM(self):
return self.FFPRIM
@readEqdskContentsIfRequired
def getPPRIME(self):
return self.PPRIME
@readEqdskContentsIfRequired
def getQPSI(self):
return self.QPSI
@readEqdskContentsIfRequired
def getSIMAG(self):
return self.SIMAG
@readEqdskContentsIfRequired
def getSIBRY(self):
return self.SIBRY
@readEqdskContentsIfRequired
def getRCENTR(self):
return self.RCENTR
@readEqdskContentsIfRequired
def getBCENTR(self):
return self.BCENTR
@readEqdskContentsIfRequired
def getRLEFT(self):
return self.RLEFT
@readEqdskContentsIfRequired
def getZMID(self):
return self.ZMID
def setHEADER(self, new):
self.HEADER = new
def setPSIRZ(self, new):
self.PSIRZ = new
def setCURRENT(self, new):
self.CURRENT = new
def setRDIM(self, new):
self.RDIM = new
def setZDIM(self, new):
self.ZDIM = new
def setNW(self, new):
self.NW = new
def setNH(self, new):
self.NH = new
def setLIMITR(self, new):
self.LIMITR = new
def setRLIM(self, new):
self.RLIM = new
def setZLIM(self, new):
self.ZLIM = new
def setNBBBS(self, new):
self.NBBBS = new
def setRBBBS(self, new):
self.RBBBS = new
def setZBBBS(self, new):
self.ZBBBS = new
def setRMAXIS(self, new):
self.RMAXIS = new
def setZMAXIS(self, new):
self.ZMAXIS = new
def setFPOL(self, new):
self.FPOL = new
def setPRES(self, new):
self.PRES = new
def setFFPRIM(self, new):
self.FFPRIM = new
def setPPRIME(self, new):
self.PPRIME = new
def setQPSI(self, new):
self.QPSI = new
def setSIMAG(self, new):
self.SIMAG = new
def setSIBRY(self, new):
self.SIBRY = new
def setRCENTR(self, new):
self.RCENTR = new
def setBCENTR(self, new):
self.BCENTR = new
def setRLEFT(self, new):
self.RLEFT = new
def setZMID(self, new):
self.ZMID = new
class EQDSK_RBSpline(object):
"""Class for interpolating EQDSK-G data for finding the LCFS.
"""
def __init__(self):
self.EQDSK = None
self.INTRP = None
self.NW = None
self.R1 = None
self.R2 = None
self.NH = None
self.Z1 = None
self.Z2 = None
def setEqdskObject(self, obj):
self.EQDSK = obj
def readEqdskFile(self, file):
with open(file, 'r') as f:
string = f.read()
obj = EQDSK()
obj.read(string)
self.EQDSK = obj
def interpolate(self):
if not self.EQDSK:
return False
if not self.EQDSK.readOk():
return False
# Width, Height
self.NW = self.EQDSK.getNW()
self.NH = self.EQDSK.getNH()
# R values
self.RDIM = self.EQDSK.getRDIM()
self.R1 = self.EQDSK.getRLEFT()
self.R2 = self.R1 + self.RDIM
R = np.linspace(self.R1, self.R2, self.NW)
# Z values
self.ZMID = self.EQDSK.getZMID()
self.ZDIM = self.EQDSK.getZDIM()
self.Z1 = self.ZMID - 0.5 * self.ZDIM
self.Z2 = self.ZMID + 0.5 * self.ZDIM
Z = np.linspace(self.Z1, self.Z2, self.NH)
self.PSIRZ = np.asarray(self.EQDSK.getPSIRZ())
from scipy.interpolate import RectBivariateSpline
self.INTRP = RectBivariateSpline(Z, R, self.PSIRZ, kx=5, ky=5, s=0.0)
return True
def findStartBisectR(self, value, R1, R2, Z):
"""Find the starting point using bisection
"""
def bisectFunction(R, Z, value):
"""Traverse on a line where the Z value if constant
"""
return self.INTRP.ev(Z, R) - value
from scipy.optimize import bisect
return bisect(bisectFunction, R1, R2, maxiter=10000, args=(Z, value))
def findStartBisectZ(self, value, Z1, Z2, R):
"""Find the starting point using bisection
"""
def bisectFunction(Z, R, value):
"""Traverse on a line where the Z value if constant
"""
return self.INTRP.ev(Z, R) - value
from scipy.optimize import bisect
return bisect(bisectFunction, Z1, Z2, maxiter=10000, args=(R, value))
def _findCurve(self, pos, value, step, tolerance):
i = 0
R = [pos[1]]
Z = [pos[0]]
_ok = False
while i < 1e5:
# Get derivatives
der = self.INTRP.ev(pos[0], pos[1], dx=1), \
self.INTRP.ev(pos[0], pos[1], dy=1)
rot90 = [der[1], -der[0]]
rot90 /= np.linalg.norm(rot90)
newPos = [0.0, 0.0]
newPos[0] = pos[0] + step * rot90[0]
newPos[1] = pos[1] + step * rot90[1]
deltaPsi = value - self.INTRP.ev(*newPos)
corrRot90 = [rot90[1], -rot90[0]] # Already normed
der = self.INTRP.ev(newPos[0], newPos[1], dx=1), \
self.INTRP.ev(newPos[0], newPos[1], dy=1)
scale = deltaPsi / np.linalg.norm(der)
newPos[0] -= scale * corrRot90[0]
newPos[1] -= scale * corrRot90[1]
if i > 5000:
disR = R[0] - newPos[1]
disZ = Z[0] - newPos[0]
distance = disR * disR + disZ * disZ
# if distance < 1:
# print(distance)
if distance <= tolerance:
_ok = True
break
if newPos[1] < self.R1 or newPos[1] > self.R2:
# Out of bounds
break
if newPos[0] < self.Z1 or newPos[0] > self.Z2:
# Out of bounds
break
R.append(newPos[1])
Z.append(newPos[0])
pos = newPos
i += 1
return _ok, R, Z
def findIsoContour(self, value, movementStep=1e-3, tolerance=1e-6):
# Scan for intervals where the value is inside of it.
_ok = False
points = []
for i in range(10):
for j in range(10):
r1 = self.R1 + i * self.RDIM / 10.0
r2 = r1 + self.RDIM / 10.0
z_h = self.Z1 + (0.5 + j) * self.ZDIM / 10.0
v1 = self.INTRP.ev(z_h, r1) - value
v2 = self.INTRP.ev(z_h, r2) - value
if v1 * v2 < 0:
_t = 'Horizontal'
_ok = True
# print('Z', z_h, 'R', r1, r2, v1, v2)
points.append(['H', z_h, r1, r2])
continue
z1 = self.Z1 + j * self.ZDIM / 10.0
z2 = z1 + self.ZDIM / 10.0
r_h = self.R1 + (0.5 + i) * self.RDIM / 10.0
v1 = self.INTRP.ev(z1, r_h) - value
v2 = self.INTRP.ev(z2, r_h) - value
if v1 * v2 < 0:
_t = 'Vertical'
_ok = True
# print('R', r_h, 'Z', z1, z2, v1, v2)
points.append(['V', r_h, z1, z2])
if not _ok:
print('Failed to find value in the sweep.')
return [], []
for _set in points:
_t = _set[0]
half, c1, c2 = _set[1:]
if _t == 'H':
pos = [half, self.findStartBisectR(value, c1, c2, half)]
else:
pos = [self.findStartBisectZ(value, c1, c2, half), half]
_ok, R, Z = self._findCurve(pos, value, movementStep, tolerance)
if _ok:
return R, Z
return [], []