diff --git a/geos-mesh/src/geos/mesh/fault/DMZFinder.py b/geos-mesh/src/geos/mesh/fault/DMZFinder.py new file mode 100644 index 00000000..08129648 --- /dev/null +++ b/geos-mesh/src/geos/mesh/fault/DMZFinder.py @@ -0,0 +1,191 @@ +# ------------------------------------------------------------------------------------------------------------ +# SPDX-License-Identifier: LGPL-2.1-only +# +# Copyright (c) 2016-2024 Lawrence Livermore National Security LLC +# Copyright (c) 2018-2024 TotalEnergies +# Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University +# Copyright (c) 2023-2024 Chevron +# Copyright (c) 2019- GEOS/GEOSX Contributors +# All rights reserved +# +# See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details. +# ------------------------------------------------------------------------------------------------------------ +import numpy as np +from typing_extensions import Self +from vtkmodules.vtkCommonDataModel import vtkUnstructuredGrid +from vtkmodules.numpy_interface import dataset_adapter as dsa +from geos.mesh.utils.genericHelpers import find_2D_cell_ids, find_cells_near_faces +from geos.utils.Logger import logging, Logger, getLogger + +__doc__ = """ +Find Damage/Fractured Zone (DMZ) cells in mesh based on proximity to fault faces. + +Input mesh is vtkUnstructuredGrid and output is the same mesh with added DMZ array. + +The DMZ is identified by finding cells within a specified distance from fault faces +in a given active region. + +To use a handler of yours for the logger, set the variable 'speHandler' to True and add it to the filter +with the member function setLoggerHandler. + +To use it: + +.. code-block:: python + + from geos.mesh.fault.DMZFinder import DMZFinder + + # Filter inputs. + unstructuredGrid: vtkUnstructuredGrid + region_array_name: str = "attribute" + active_region_id: int = 0 + active_fault_id: int = 0 + output_array_name: str = "isDMZ" + dmz_length: float = 100.0 + # Optional inputs. + speHandler: bool + + # Instantiate the filter. + filter: DMZFinder = DMZFinder( unstructuredGrid, region_array_name, active_region_id, + active_fault_id, output_array_name, dmz_length, speHandler ) + + # Set the handler of yours (only if speHandler is True). + yourHandler: logging.Handler + filter.setLoggerHandler( yourHandler ) + + # Do calculations. + filter.applyFilter() +""" + +loggerTitle: str = "DMZ Finder" + + +class DMZFinder: + + def __init__( + self: Self, + unstructuredGrid: vtkUnstructuredGrid, + region_array_name: str = "attribute", + active_region_id: int = 0, + active_fault_id: int = 0, + output_array_name: str = "isDMZ", + dmz_length: float = 100.0, + speHandler: bool = False, + ) -> None: + """Find Damage/Fractured Zone (DMZ) cells in mesh. + + Args: + unstructuredGrid (vtkUnstructuredGrid): The mesh where to find DMZ cells. + region_array_name (str, optional): Name of the region array. Defaults to "attribute". + active_region_id (int, optional): ID of the active region. Defaults to 0. + active_fault_id (int, optional): ID of the active fault. Defaults to 0. + output_array_name (str, optional): Name of the output DMZ array. Defaults to "isDMZ". + dmz_length (float, optional): Distance for DMZ identification. Defaults to 100.0. + speHandler (bool, optional): True to use a specific handler, False to use the internal handler. + Defaults to False. + """ + self.unstructuredGrid: vtkUnstructuredGrid = unstructuredGrid + self.region_array_name: str = region_array_name + self.active_region_id: int = active_region_id + self.active_fault_id: int = active_fault_id + self.output_array_name: str = output_array_name + self.dmz_length: float = dmz_length + + # Logger. + self.logger: Logger + if not speHandler: + self.logger = getLogger( loggerTitle, True ) + else: + self.logger = logging.getLogger( loggerTitle ) + self.logger.setLevel( logging.INFO ) + + def setLoggerHandler( self: Self, handler: logging.Handler ) -> None: + """Set a specific handler for the filter logger. + + In this filter 4 log levels are used, .info, .error, .warning and .critical, + be sure to have at least the same 4 levels. + + Args: + handler (logging.Handler): The handler to add. + """ + if not self.logger.hasHandlers(): + self.logger.addHandler( handler ) + else: + self.logger.warning( "The logger already has a handler, to use yours set the argument 'speHandler' to True " + "during the filter initialization." ) + + def applyFilter( self: Self ) -> bool: + """Apply the DMZ finding algorithm to the mesh. + + Returns: + bool: True if calculation successfully ended, False otherwise. + """ + self.logger.info( f"Apply filter { self.logger.name }." ) + + input_grid = dsa.WrapDataObject( self.unstructuredGrid ) + + if not self._check_inputs( input_grid ): + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + # Get the array that defines the geological regions + region_array = input_grid.CellData[ self.region_array_name ] + + all_mesh_face_ids: set[ int ] = find_2D_cell_ids( self.unstructuredGrid ) + if not all_mesh_face_ids: + self.logger.error( "No 2D face cells found." ) + self.logger.error( f"The filter { self.logger.name } failed." ) + return False + + number_cells: int = self.unstructuredGrid.GetNumberOfCells() + # identify which faces are in the active region + all_faces_mask = np.zeros( number_cells, dtype=bool ) + all_faces_mask[ list( all_mesh_face_ids ) ] = True + active_region_mask = ( region_array == self.active_region_id ) + active_fault_mask = ( region_array == self.active_fault_id ) + active_fault_faces_mask = np.logical_and( all_faces_mask, active_fault_mask ) + active_fault_faces: set[ int ] = set( np.where( active_fault_faces_mask )[ 0 ] ) + cells_near_faces: set[ int ] = find_cells_near_faces( self.unstructuredGrid, active_fault_faces, + self.dmz_length ) + + # Identify which cells are in the DMZ + dmz_mask = np.zeros( number_cells, dtype=bool ) + dmz_mask[ list( cells_near_faces ) ] = True + final_mask = np.logical_and( dmz_mask, active_region_mask ) + + # Create the new CellData array + new_region_id_array = np.zeros( region_array.shape, dtype=int ) + new_region_id_array[ final_mask ] = 1 + + # Add new isDMZ array to the input_grid (which modifies the original mesh) + input_grid.CellData.append( new_region_id_array, self.output_array_name ) + + self.logger.info( f"The filter { self.logger.name } succeed." ) + return True + + def _check_inputs( self: Self, dsa_mesh: dsa.UnstructuredGrid ) -> bool: + """Check if inputs are valid. + + Args: + dsa_mesh (dsa.UnstructuredGrid): The wrapped mesh to check. + + Returns: + bool: True if inputs are valid, False otherwise. + """ + if self.output_array_name == self.region_array_name: + self.logger.error( "Output array name cannot be the same as region array name." ) + return False + + if dsa_mesh is None: + self.logger.error( "Input mesh is not set." ) + return False + + region_array = dsa_mesh.CellData[ self.region_array_name ] + if region_array is None: + self.logger.error( f"Region array '{self.region_array_name}' is not found in input mesh." ) + return False + else: + if self.active_region_id not in region_array: + self.logger.error( f"Active region ID '{self.active_region_id}' is not found in region array." ) + return False + + return True diff --git a/geos-mesh/src/geos/mesh/utils/genericHelpers.py b/geos-mesh/src/geos/mesh/utils/genericHelpers.py index de0624fd..51f4f23b 100644 --- a/geos-mesh/src/geos/mesh/utils/genericHelpers.py +++ b/geos-mesh/src/geos/mesh/utils/genericHelpers.py @@ -4,10 +4,12 @@ import numpy as np import numpy.typing as npt from typing import Iterator, List, Sequence, Any, Union -from vtkmodules.util.numpy_support import numpy_to_vtk +from vtkmodules.util.numpy_support import numpy_to_vtk, vtk_to_numpy from vtkmodules.vtkCommonCore import vtkIdList, vtkPoints, reference -from vtkmodules.vtkCommonDataModel import vtkUnstructuredGrid, vtkMultiBlockDataSet, vtkPolyData, vtkDataSet, vtkDataObject, vtkPlane, vtkCellTypes, vtkIncrementalOctreePointLocator -from vtkmodules.vtkFiltersCore import vtk3DLinearGridPlaneCutter +from vtkmodules.vtkCommonDataModel import ( vtkUnstructuredGrid, vtkMultiBlockDataSet, vtkPolyData, vtkDataSet, + vtkDataObject, vtkPlane, vtkCellTypes, vtkIncrementalOctreePointLocator, + vtkKdTreePointLocator, VTK_QUAD, VTK_TRIANGLE ) +from vtkmodules.vtkFiltersCore import vtk3DLinearGridPlaneCutter, vtkCellCenters from geos.mesh.utils.multiblockHelpers import ( getBlockElementIndexesFlatten, getBlockFromFlatIndex ) __doc__ = """ @@ -53,6 +55,71 @@ def vtk_iter( vtkContainer: vtkIdList | vtkCellTypes ) -> Iterator[ Any ]: yield vtkContainer.GetCellType( i ) +def find_2D_cell_ids( mesh: vtkUnstructuredGrid ) -> set[ int ]: + """ + Find 2D cell IDs in a 3D unstructured grid. + + Args: + mesh (vtkUnstructuredGrid): The input 3D mesh. + + Returns: + set[int]: A set of 2D cell IDs. + """ + cell_types_vtk_array = mesh.GetCellTypesArray() + if not cell_types_vtk_array: + return set() + + cell_types_numpy_array = vtk_to_numpy( cell_types_vtk_array ) + triangle_ids = np.where( cell_types_numpy_array == VTK_TRIANGLE )[ 0 ] + quad_ids = np.where( cell_types_numpy_array == VTK_QUAD )[ 0 ] + return set( np.concatenate( ( triangle_ids, quad_ids ) ) ) + + +def find_cells_near_faces( mesh: vtkUnstructuredGrid, face_ids: set[ int ], distance: float ) -> set[ int ]: + """ + Given a vtkUnstructuredGrid, a list of face cell IDs, and a distance, find unique cell IDs + whose centroids are within the distance to at least one face centroid. + + Args: + mesh (vtkUnstructuredGrid) + face_ids (set[int]): cell IDs of the faces + distance (float): search radius + + Returns: + set(int): unique cell IDs + """ + # Compute cell centers for all cells + cell_centers_filter = vtkCellCenters() + cell_centers_filter.SetInputData( mesh ) + cell_centers_filter.Update() + centers_polydata: vtkPolyData = cell_centers_filter.GetOutput() + + # Build kd-tree point locator on the cell centers + locator = vtkKdTreePointLocator() + locator.SetDataSet( centers_polydata ) + locator.BuildLocator() + + # Use a set to collect unique cell IDs + nearby_cells = set() + + # For each face, get its centroid and find nearby points (cell IDs) + points: vtkPoints = centers_polydata.GetPoints() + for face_id in face_ids: + # Get face centroid + face_centroid = points.GetPoint( face_id ) + + # Find points within radius + result = vtkIdList() + locator.FindPointsWithinRadius( distance, face_centroid, result ) + + # Add to set + for j in range( result.GetNumberOfIds() ): + cell_id = result.GetId( j ) + nearby_cells.add( cell_id ) + + return nearby_cells + + def extractSurfaceFromElevation( mesh: vtkUnstructuredGrid, elevation: float ) -> vtkPolyData: """Extract surface at a constant elevation from a mesh. diff --git a/geos-pv/src/geos/pv/plugins/PVDMZFinder.py b/geos-pv/src/geos/pv/plugins/PVDMZFinder.py new file mode 100644 index 00000000..e1ff8355 --- /dev/null +++ b/geos-pv/src/geos/pv/plugins/PVDMZFinder.py @@ -0,0 +1,183 @@ +# ------------------------------------------------------------------------------------------------------------ +# SPDX-License-Identifier: LGPL-2.1-only +# +# Copyright (c) 2016-2024 Lawrence Livermore National Security LLC +# Copyright (c) 2018-2024 TotalEnergies +# Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University +# Copyright (c) 2023-2024 Chevron +# Copyright (c) 2019- GEOS/GEOSX Contributors +# All rights reserved +# +# See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details. +# ------------------------------------------------------------------------------------------------------------ +import sys +from pathlib import Path +from typing_extensions import Self +from paraview.util.vtkAlgorithm import VTKPythonAlgorithmBase, smdomain, smhint, smproperty, smproxy +from paraview.detail.loghandler import VTKHandler +from vtkmodules.vtkCommonDataModel import vtkUnstructuredGrid +from vtkmodules.vtkCommonCore import vtkInformation, vtkInformationVector + +# update sys.path to load all GEOS Python Package dependencies +geos_pv_path: Path = Path( __file__ ).parent.parent.parent.parent.parent +sys.path.insert( 0, str( geos_pv_path / "src" ) ) +from geos.pv.utils.config import update_paths + +update_paths() + +from geos.mesh.fault.DMZFinder import DMZFinder + +__doc__ = """ +PVDMZFinder is a Paraview plugin that finds Damage/Fractured Zone (DMZ) cells in mesh. + +Input and output types are vtkUnstructuredGrid. + +This filter results in a single output pipeline that contains the volume mesh with added DMZ array. + +To use it: + +* Load the module in Paraview: Tools>Manage Plugins...>Load new>PVDMZFinder.py +* Select the .vtu grid loaded in Paraview. +* Set the parameters (Fault ID, Region ID, DZ Length, etc.) +* Apply. + +""" + + +@smproxy.filter( name="PVDMZFinder", label="Damage Zone Finder" ) +@smhint.xml( '' ) +@smproperty.input( name="Input", port_index=0 ) +@smdomain.datatype( dataTypes=[ "vtkUnstructuredGrid" ], composite_data_supported=True ) +class PVDMZFinder( VTKPythonAlgorithmBase ): + + def __init__( self: Self ) -> None: + """Find Damage/Fractured Zone (DMZ) cells in mesh.""" + super().__init__( nInputPorts=1, + nOutputPorts=1, + inputType="vtkUnstructuredGrid", + outputType="vtkUnstructuredGrid" ) + + self.region_array_name: str = "attribute" + self.active_region_id: int = 0 + self.active_fault_id: int = 0 + self.output_array_name: str = "isDMZ" + self.dmz_length: float = 100.0 + + @smproperty.intvector( name="Fault ID", default_values=0, number_of_elements=1 ) + def SetActiveFaultID( self: Self, value: int ) -> None: + """Set the active fault ID. + + Args: + value (int): The fault ID to set. + """ + if self.active_fault_id != value: + self.active_fault_id = value + self.Modified() + + @smproperty.intvector( name="Region ID", default_values=0, number_of_elements=1 ) + def SetActiveRegionID( self: Self, value: int ) -> None: + """Set the active region ID. + + Args: + value (int): The region ID to set. + """ + if self.active_region_id != value: + self.active_region_id = value + self.Modified() + + @smproperty.doublevector( name="DZ Length", default_values=100.0, number_of_elements=1 ) + def SetDmzLength( self: Self, value: float ) -> None: + """Set the DMZ length. + + Args: + value (float): The DMZ length to set. + """ + if self.dmz_length != value: + self.dmz_length = value + self.Modified() + + @smproperty.stringvector( name="Output Array Name", default_values="isDMZ", number_of_elements=1 ) + def SetOutputArrayName( self: Self, name: str ) -> None: + """Set the output array name. + + Args: + name (str): The output array name to set. + """ + if self.output_array_name != name: + self.output_array_name = name + self.Modified() + + @smproperty.stringvector( name="Region Array Name", default_values="attribute", number_of_elements=1 ) + def SetRegionArrayName( self: Self, name: str ) -> None: + """Set the region array name. + + Args: + name (str): The region array name to set. + """ + if self.region_array_name != name: + self.region_array_name = name + self.Modified() + + def RequestDataObject( + self: Self, + request: vtkInformation, + inInfoVec: list[ vtkInformationVector ], + outInfoVec: vtkInformationVector, + ) -> int: + """Inherited from VTKPythonAlgorithmBase::RequestDataObject. + + Args: + request (vtkInformation): Request + inInfoVec (list[vtkInformationVector]): Input objects + outInfoVec (vtkInformationVector): Output objects + + Returns: + int: 1 if calculation successfully ended, 0 otherwise. + """ + inData = self.GetInputData( inInfoVec, 0, 0 ) + outData = self.GetOutputData( outInfoVec, 0 ) + assert inData is not None + if outData is None or ( not outData.IsA( inData.GetClassName() ) ): + outData = inData.NewInstance() + outInfoVec.GetInformationObject( 0 ).Set( outData.DATA_OBJECT(), outData ) + return super().RequestDataObject( request, inInfoVec, outInfoVec ) # type: ignore[no-any-return] + + def RequestData( + self: Self, + request: vtkInformation, # noqa: F841 + inInfoVec: list[ vtkInformationVector ], + outInfoVec: vtkInformationVector, + ) -> int: + """Inherited from VTKPythonAlgorithmBase::RequestData. + + Args: + request (vtkInformation): Request + inInfoVec (list[vtkInformationVector]): Input objects + outInfoVec (vtkInformationVector): Output objects + + Returns: + int: 1 if calculation successfully ended, 0 otherwise. + """ + inputMesh: vtkUnstructuredGrid = self.GetInputData( inInfoVec, 0, 0 ) + outputMesh: vtkUnstructuredGrid = self.GetOutputData( outInfoVec, 0 ) + assert inputMesh is not None, "Input mesh is null." + assert outputMesh is not None, "Output pipeline is null." + + outputMesh.ShallowCopy( inputMesh ) + + filter: DMZFinder = DMZFinder( + outputMesh, + self.region_array_name, + self.active_region_id, + self.active_fault_id, + self.output_array_name, + self.dmz_length, + True, + ) + + if not filter.logger.hasHandlers(): + filter.setLoggerHandler( VTKHandler() ) + + filter.applyFilter() + + return 1