# Source code for baseclasses.solvers.pyAero_solver

```
"""
pyAero_solver
Holds the Python Aerodynamic Analysis Classes (base).
"""
# =============================================================================
# Standard Python modules
# =============================================================================
import numpy as np
# =============================================================================
# Extension modules
# =============================================================================
from .BaseSolver import BaseSolver
from ..utils import CaseInsensitiveDict, Error
# =============================================================================
# AeroSolver Class
# =============================================================================
[docs]
class AeroSolver(BaseSolver):
"""
Abstract Class for Aerodynamic Solver Object
"""
def __init__(
self,
name,
category,
defaultOptions={},
options={},
immutableOptions=set(),
deprecatedOptions={},
comm=None,
informs={},
):
"""
AeroSolver Class Initialization
"""
# Setup option info
super().__init__(
name,
category=category,
defaultOptions=defaultOptions,
options=options,
immutableOptions=immutableOptions,
deprecatedOptions=deprecatedOptions,
comm=comm,
informs=informs,
)
self.families = CaseInsensitiveDict()
self._updateGeomInfo = False
# Initialize kwargs for addPointSet and customPointSetFamilies
self.pointSetKwargs = None
self.customPointSetFamilies = None
[docs]
def setMesh(self, mesh):
"""
Set the mesh object to the aero_solver to do geometric deformations
Parameters
----------
mesh : MBMesh or USMesh object
The mesh object for doing the warping
"""
# Store a reference to the mesh
self.mesh = mesh
# Setup External Warping with volume indices
meshInd = self.getSolverMeshIndices()
self.mesh.setExternalMeshIndices(meshInd)
# Set the surface the user has supplied:
conn, faceSizes = self.getSurfaceConnectivity(self.meshFamilyGroup)
pts = self.getSurfaceCoordinates(self.meshFamilyGroup)
self.mesh.setSurfaceDefinition(pts, conn, faceSizes)
[docs]
def setDVGeo(self, DVGeo, pointSetKwargs=None, customPointSetFamilies=None):
"""
Set the DVGeometry object that will manipulate 'geometry' in
this object. Note that <SOLVER> does not **strictly** need a
DVGeometry object, but if optimization with geometric
changes is desired, then it is required.
Parameters
----------
DVGeo : A DVGeometry object.
Object responsible for manipulating the geometry.
pointSetKwargs : dict
Keyword arguments to be passed to the DVGeo addPointSet call.
Useful for DVGeometryMulti, specifying FFD projection tolerances, etc.
These arguments are used for all point sets added by this solver.
customPointSetFamilies : dict of dicts
This argument is used to split up the surface points added to the DVGeo by the solver into potentially
multiple subsets. The keys of the dictionary will be used to determine what families should be
added to the dvgeo object as separate point sets. The values of each key is another dictionary, which can be empty.
If desired, the inner dictionaries can contain custom kwargs for the addPointSet call for each surface family,
specified by the keys of the top level dictionary.
The surface families need to be all part of the designSurfaceFamily.
Useful for DVGeometryMulti, specifying FFD projection tolerances, etc.
If this is provided together with pointSetKwargs, the regular pointSetKwargs
will be appended to each component's dictionary. If the same argument
is also provided in pointSetKwargs, the value specified in customPointSetFamilies
will be used.
Examples
--------
>>> CFDsolver = <SOLVER>(comm=comm, options=CFDoptions)
>>> CFDsolver.setDVGeo(DVGeo)
"""
self.DVGeo = DVGeo
# save the common kwargs dict. default is empty
if pointSetKwargs is None:
self.pointSetKwargs = {}
else:
self.pointSetKwargs = pointSetKwargs
# save if we have customPointSetFamilies. this default is not mutable so we can just set it as is.
self.customPointSetFamilies = customPointSetFamilies
[docs]
def getTriangulatedMeshSurface(self, groupName=None, **kwargs):
"""
This function returns a trianguled verision of the surface
mesh on all processors. The intent is to use this for doing
constraints in DVConstraints.
Returns
-------
surf : list
List of points and vectors describing the surface. This may
be passed directly to DVConstraint setSurface() function.
"""
if groupName is None:
groupName = self.allWallsGroup
# Obtain the points and connectivity for the specified
# groupName
pts = self.comm.allgather(self.getSurfaceCoordinates(groupName, **kwargs))
conn, faceSizes = self.getSurfaceConnectivity(groupName)
conn = np.array(conn).flatten()
conn = self.comm.allgather(conn)
faceSizes = self.comm.allgather(faceSizes)
# Triangle info...point and two vectors
p0 = []
v1 = []
v2 = []
# loop over the faces
for iProc in range(len(faceSizes)):
connCounter = 0
for iFace in range(len(faceSizes[iProc])):
# Get the number of nodes on this face
faceSize = faceSizes[iProc][iFace]
faceNodes = conn[iProc][connCounter : connCounter + faceSize]
# Start by getting the centerpoint
ptSum = [0, 0, 0]
for i in range(faceSize):
# idx = ptCounter+i
idx = faceNodes[i]
ptSum += pts[iProc][idx]
avgPt = ptSum / faceSize
# Now go around the face and add a triangle for each adjacent pair
# of points. This assumes an ordered connectivity from the
# meshwarping
for i in range(faceSize):
idx = faceNodes[i]
p0.append(avgPt)
v1.append(pts[iProc][idx] - avgPt)
if i < (faceSize - 1):
idxp1 = faceNodes[i + 1]
v2.append(pts[iProc][idxp1] - avgPt)
else:
# wrap back to the first point for the last element
idx0 = faceNodes[0]
v2.append(pts[iProc][idx0] - avgPt)
# Now increment the connectivity
connCounter += faceSize
return [p0, v1, v2]
[docs]
def writeTriangulatedSurfaceTecplot(self, fileName, groupName=None, **kwargs):
"""
Write the triangulated surface mesh from the solver in tecplot.
Parameters
----------
fileName : str
File name for tecplot file. Should have a .dat extension.
groupName : str
Set of boundaries to include in the surface.
"""
[p0, v1, v2] = self.getTriangulatedMeshSurface(groupName, **kwargs)
if self.comm.rank == 0:
f = open(fileName, "w")
f.write('TITLE = "%s Surface Mesh"\n' % self.name)
f.write('VARIABLES = "CoordinateX" "CoordinateY" "CoordinateZ"\n')
f.write("Zone T=%s\n" % ("surf"))
f.write("Nodes = %d, Elements = %d ZONETYPE=FETRIANGLE\n" % (len(p0) * 3, len(p0)))
f.write("DATAPACKING=POINT\n")
for i in range(len(p0)):
points = []
points.append(p0[i])
points.append(p0[i] + v1[i])
points.append(p0[i] + v2[i])
for i in range(len(points)):
f.write(f"{points[i][0]:f} {points[i][1]:f} {points[i][2]:f}\n")
for i in range(len(p0)):
f.write("%d %d %d\n" % (3 * i + 1, 3 * i + 2, 3 * i + 3))
f.close()
[docs]
def checkSolutionFailure(self, aeroProblem, funcs):
"""Take in a an aeroProblem and check for failure. Then append the
fail flag in funcs. Information regarding whether or not the
last analysis with the aeroProblem was sucessful is
included. This information is included as "funcs['fail']". If
the 'fail' entry already exits in the dictionary the following
operation is performed:
funcs['fail'] = funcs['fail'] or <did this problem fail>
In other words, if any one problem fails, the funcs['fail']
entry will be True. This information can then be used
directly in multiPointSparse. For direct interface with pyOptSparse
the fail flag needs to be returned separately from the funcs.
Parameters
----------
aeroProblem : pyAero_problem class
The aerodynamic problem to to get the solution for
funcs : dict
Dictionary into which the functions are saved.
"""
self.setAeroProblem(aeroProblem)
# We also add the fail flag into the funcs dictionary. If fail
# is already there, we just logically 'or' what was
# there. Otherwise we add a new entry.
failFlag = self.curAP.solveFailed or self.curAP.fatalFail
if "fail" in funcs:
funcs["fail"] = funcs["fail"] or failFlag
else:
funcs["fail"] = failFlag
[docs]
def checkAdjointFailure(self, aeroProblem, funcsSens):
"""Take in a an aeroProblem and check for adjoint failure, Then append the
fail flag in funcsSens. Information regarding whether or not the
last analysis with the aeroProblem was sucessful is
included. This information is included as "funcsSens['fail']". If
the 'fail' entry already exits in the dictionary the following
operation is performed:
funcsSens['fail'] = funcsSens['fail'] or <did this problem fail>
In other words, if any one problem fails, the funcsSens['fail']
entry will be True. This information can then be used
directly in multiPointSparse. For direct interface with pyOptSparse
the fail flag needs to be returned separately from the funcs.
Parameters
----------
aeroProblem : pyAero_problem class
The aerodynamic problem to to get the solution for
funcsSens : dict
Dictionary into which the functions are saved.
"""
self.setAeroProblem(aeroProblem)
# We also add the fail flag into the funcs dictionary. If fail
# is already there, we just logically 'or' what was
# there. Otherwise we add a new entry.
failFlag = self.curAP.adjointFailed
if "fail" in funcsSens:
funcsSens["fail"] = funcsSens["fail"] or failFlag
else:
funcsSens["fail"] = failFlag
[docs]
def addFamilyGroup(self, groupName, families):
"""Add a custom grouping of families called groupName. The groupName
must be distinct from the existing families. All families must
in the 'families' list must be present in the CGNS file.
Parameters
----------
groupName : str
User-supplied custom name for the family groupings
families : list
List of string. Family names to combine into the family group
"""
# Do some error checking
if groupName in self.families:
raise Error(
"The specified groupName '%s' already exists in the " "cgns file or has already been added." % groupName
)
# We can actually allow for nested groups. That is, an entry
# in families may already be a group added in a previous call.
indices = []
for fam in families:
if fam not in self.families:
raise Error(
"The specified family '%s' for group '%s', does "
"not exist in the cgns file or has "
"not already been added. The current list of "
"families (original and grouped) is: %s" % (fam, groupName, repr(self.families.keys()))
)
indices.extend(self.families[fam])
# It is very important that the list of families is sorted
# becuase in fortran we always use a binary search to check if
# a famID is in the list.
self.families[groupName] = sorted(np.unique(indices))
[docs]
def getSurfaceCoordinates(self, group_name):
"""
Return the set of surface coordinates cooresponding to a
Particular group name
"""
pass
def getInitialSurfaceCoordinates(self, groupName=None):
""""""
if groupName is None:
groupName = self.allWallsGroup
if self.mesh is not None:
if self.DVGeo is not None:
# if we have a geometry object, return the undeflected
# shape generated directly from the design variables
ptSetName = self.getPointSetName(self.curAP.name)
self.setSurfaceCoordinates(self.DVGeo.update(ptSetName, config=self.curAP.name), self.designFamilyGroup)
self.updateGeometryInfo()
return self.getSurfaceCoordinates(groupName)
else:
# otherwise, the initial mesh is the undeflected mesh, so
# return that
coords = self.mapVector(self.coords0, self.allFamilies, groupName)
return coords
[docs]
def setSurfaceCoordinates(self, coordinates, groupName=None):
"""
Set the updated surface coordinates for a particular group.
Parameters
----------
coordinates : numpy array
Numpy array of size Nx3, where N is the number of coordinates on this processor.
This array must have the same shape as the array obtained with getSurfaceCoordinates()
groupName : str
Name of family or group of families for which to return coordinates for.
"""
if self.mesh is None:
return
if groupName is None:
groupName = self.allWallsGroup
self._updateGeomInfo = True
if self.mesh is None:
raise Error("Cannot set new surface coordinate locations without a mesh" "warping object present.")
# First get the surface coordinates of the meshFamily in case
# the groupName is a subset, those values will remain unchanged.
meshSurfCoords = self.getSurfaceCoordinates(self.meshFamilyGroup)
meshSurfCoords = self.mapVector(coordinates, groupName, self.meshFamilyGroup, meshSurfCoords)
self.mesh.setSurfaceCoordinates(meshSurfCoords)
[docs]
def getForces(self, group_name):
"""
Return the set of forces at the locations defined by
getSurfaceCoordinates
"""
pass
[docs]
def globalNKPreCon(self, in_vec):
"""
Precondition the residual in in_vec for a coupled
Newton-Krylov Method
"""
pass
[docs]
def totalSurfaceDerivative(self, objective):
"""
Return the total derivative of the objective at surface
coordinates
"""
pass
[docs]
def totalAeroDerivative(self, objective):
"""
Return the total derivative of the objective with respect
to aerodynamic-only variables
"""
pass
[docs]
def getResNorms(self):
"""
Return the inital,starting and final residual norms for
the solver
"""
pass
[docs]
def getStateSize(self):
"""
Return the number of degrees of freedom (states) that are
on this processor
"""
pass
[docs]
def solveAdjoint(self, objective, *args, **kwargs):
"""
Solve the adjoint problem for the desired objective functions.
objectives - List of objective functions
"""
pass
[docs]
def printFamilyList(self):
"""
Print a nicely formatted dictionary of the family names
"""
from pprint import pprint
pprint(self.families)
# --------------------------
# Private Utility functions
# --------------------------
def _getFamilyList(self, groupName):
if groupName is None:
groupName = self.allFamilies
if groupName not in self.families:
raise Error(
"'%s' is not a family in the CGNS file or has not been added"
" as a combination of families" % groupName
)
return self.families[groupName]
```