Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions geos-mesh/src/geos/mesh/fault/DMZFinder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# ------------------------------------------------------------------------------------------------------------
# SPDX-License-Identifier: LGPL-2.1-only
#
# Copyright (c) 2016-2024 Lawrence Livermore National Security LLC
# Copyright (c) 2018-2024 TotalEnergies
# Copyright (c) 2018-2024 The Board of Trustees of the Leland Stanford Junior University
# Copyright (c) 2023-2024 Chevron
# Copyright (c) 2019- GEOS/GEOSX Contributors
# All rights reserved
#
# See top level LICENSE, COPYRIGHT, CONTRIBUTORS, NOTICE, and ACKNOWLEDGEMENTS files for details.
# ------------------------------------------------------------------------------------------------------------
import numpy as np
from typing_extensions import Self
from vtkmodules.vtkCommonDataModel import vtkUnstructuredGrid
from vtkmodules.numpy_interface import dataset_adapter as dsa
from geos.mesh.utils.genericHelpers import find_2D_cell_ids, find_cells_near_faces
from geos.utils.Logger import logging, Logger, getLogger

__doc__ = """
Find Damage/Fractured Zone (DMZ) cells in mesh based on proximity to fault faces.

Input mesh is vtkUnstructuredGrid and output is the same mesh with added DMZ array.

The DMZ is identified by finding cells within a specified distance from fault faces
in a given active region.

To use a handler of yours for the logger, set the variable 'speHandler' to True and add it to the filter
with the member function setLoggerHandler.

To use it:

.. code-block:: python

from geos.mesh.fault.DMZFinder import DMZFinder

# Filter inputs.
unstructuredGrid: vtkUnstructuredGrid
region_array_name: str = "attribute"
active_region_id: int = 0
active_fault_id: int = 0
output_array_name: str = "isDMZ"
dmz_length: float = 100.0
# Optional inputs.
speHandler: bool

# Instantiate the filter.
filter: DMZFinder = DMZFinder( unstructuredGrid, region_array_name, active_region_id,
active_fault_id, output_array_name, dmz_length, speHandler )

# Set the handler of yours (only if speHandler is True).
yourHandler: logging.Handler
filter.setLoggerHandler( yourHandler )

# Do calculations.
filter.applyFilter()
"""

loggerTitle: str = "DMZ Finder"


class DMZFinder:

def __init__(
self: Self,
unstructuredGrid: vtkUnstructuredGrid,
region_array_name: str = "attribute",
active_region_id: int = 0,
active_fault_id: int = 0,
output_array_name: str = "isDMZ",
dmz_length: float = 100.0,
speHandler: bool = False,
) -> None:
"""Find Damage/Fractured Zone (DMZ) cells in mesh.

Args:
unstructuredGrid (vtkUnstructuredGrid): The mesh where to find DMZ cells.
region_array_name (str, optional): Name of the region array. Defaults to "attribute".
active_region_id (int, optional): ID of the active region. Defaults to 0.
active_fault_id (int, optional): ID of the active fault. Defaults to 0.
output_array_name (str, optional): Name of the output DMZ array. Defaults to "isDMZ".
dmz_length (float, optional): Distance for DMZ identification. Defaults to 100.0.
speHandler (bool, optional): True to use a specific handler, False to use the internal handler.
Defaults to False.
"""
self.unstructuredGrid: vtkUnstructuredGrid = unstructuredGrid
self.region_array_name: str = region_array_name
self.active_region_id: int = active_region_id
self.active_fault_id: int = active_fault_id
self.output_array_name: str = output_array_name
self.dmz_length: float = dmz_length

# Logger.
self.logger: Logger
if not speHandler:
self.logger = getLogger( loggerTitle, True )
else:
self.logger = logging.getLogger( loggerTitle )
self.logger.setLevel( logging.INFO )

def setLoggerHandler( self: Self, handler: logging.Handler ) -> None:
"""Set a specific handler for the filter logger.

In this filter 4 log levels are used, .info, .error, .warning and .critical,
be sure to have at least the same 4 levels.

Args:
handler (logging.Handler): The handler to add.
"""
if not self.logger.hasHandlers():
self.logger.addHandler( handler )
else:
self.logger.warning( "The logger already has a handler, to use yours set the argument 'speHandler' to True "
"during the filter initialization." )

def applyFilter( self: Self ) -> bool:
"""Apply the DMZ finding algorithm to the mesh.

Returns:
bool: True if calculation successfully ended, False otherwise.
"""
self.logger.info( f"Apply filter { self.logger.name }." )

input_grid = dsa.WrapDataObject( self.unstructuredGrid )

if not self._check_inputs( input_grid ):
self.logger.error( f"The filter { self.logger.name } failed." )
return False

# Get the array that defines the geological regions
region_array = input_grid.CellData[ self.region_array_name ]

all_mesh_face_ids: set[ int ] = find_2D_cell_ids( self.unstructuredGrid )
if not all_mesh_face_ids:
self.logger.error( "No 2D face cells found." )
self.logger.error( f"The filter { self.logger.name } failed." )
return False

number_cells: int = self.unstructuredGrid.GetNumberOfCells()
# identify which faces are in the active region
all_faces_mask = np.zeros( number_cells, dtype=bool )
all_faces_mask[ list( all_mesh_face_ids ) ] = True
active_region_mask = ( region_array == self.active_region_id )
active_fault_mask = ( region_array == self.active_fault_id )
active_fault_faces_mask = np.logical_and( all_faces_mask, active_fault_mask )
active_fault_faces: set[ int ] = set( np.where( active_fault_faces_mask )[ 0 ] )
cells_near_faces: set[ int ] = find_cells_near_faces( self.unstructuredGrid, active_fault_faces,
self.dmz_length )

# Identify which cells are in the DMZ
dmz_mask = np.zeros( number_cells, dtype=bool )
dmz_mask[ list( cells_near_faces ) ] = True
final_mask = np.logical_and( dmz_mask, active_region_mask )

# Create the new CellData array
new_region_id_array = np.zeros( region_array.shape, dtype=int )
new_region_id_array[ final_mask ] = 1

# Add new isDMZ array to the input_grid (which modifies the original mesh)
input_grid.CellData.append( new_region_id_array, self.output_array_name )

self.logger.info( f"The filter { self.logger.name } succeed." )
return True

def _check_inputs( self: Self, dsa_mesh: dsa.UnstructuredGrid ) -> bool:
"""Check if inputs are valid.

Args:
dsa_mesh (dsa.UnstructuredGrid): The wrapped mesh to check.

Returns:
bool: True if inputs are valid, False otherwise.
"""
if self.output_array_name == self.region_array_name:
self.logger.error( "Output array name cannot be the same as region array name." )
return False

if dsa_mesh is None:
self.logger.error( "Input mesh is not set." )
return False

region_array = dsa_mesh.CellData[ self.region_array_name ]
if region_array is None:
self.logger.error( f"Region array '{self.region_array_name}' is not found in input mesh." )
return False
else:
if self.active_region_id not in region_array:
self.logger.error( f"Active region ID '{self.active_region_id}' is not found in region array." )
return False

return True
73 changes: 70 additions & 3 deletions geos-mesh/src/geos/mesh/utils/genericHelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import numpy as np
import numpy.typing as npt
from typing import Iterator, List, Sequence, Any, Union
from vtkmodules.util.numpy_support import numpy_to_vtk
from vtkmodules.util.numpy_support import numpy_to_vtk, vtk_to_numpy
from vtkmodules.vtkCommonCore import vtkIdList, vtkPoints, reference
from vtkmodules.vtkCommonDataModel import vtkUnstructuredGrid, vtkMultiBlockDataSet, vtkPolyData, vtkDataSet, vtkDataObject, vtkPlane, vtkCellTypes, vtkIncrementalOctreePointLocator
from vtkmodules.vtkFiltersCore import vtk3DLinearGridPlaneCutter
from vtkmodules.vtkCommonDataModel import ( vtkUnstructuredGrid, vtkMultiBlockDataSet, vtkPolyData, vtkDataSet,
vtkDataObject, vtkPlane, vtkCellTypes, vtkIncrementalOctreePointLocator,
vtkKdTreePointLocator, VTK_QUAD, VTK_TRIANGLE )
from vtkmodules.vtkFiltersCore import vtk3DLinearGridPlaneCutter, vtkCellCenters
from geos.mesh.utils.multiblockHelpers import ( getBlockElementIndexesFlatten, getBlockFromFlatIndex )

__doc__ = """
Expand Down Expand Up @@ -53,6 +55,71 @@ def vtk_iter( vtkContainer: vtkIdList | vtkCellTypes ) -> Iterator[ Any ]:
yield vtkContainer.GetCellType( i )


def find_2D_cell_ids( mesh: vtkUnstructuredGrid ) -> set[ int ]:
"""
Find 2D cell IDs in a 3D unstructured grid.

Args:
mesh (vtkUnstructuredGrid): The input 3D mesh.

Returns:
set[int]: A set of 2D cell IDs.
"""
cell_types_vtk_array = mesh.GetCellTypesArray()
if not cell_types_vtk_array:
return set()

cell_types_numpy_array = vtk_to_numpy( cell_types_vtk_array )
triangle_ids = np.where( cell_types_numpy_array == VTK_TRIANGLE )[ 0 ]
quad_ids = np.where( cell_types_numpy_array == VTK_QUAD )[ 0 ]
return set( np.concatenate( ( triangle_ids, quad_ids ) ) )


def find_cells_near_faces( mesh: vtkUnstructuredGrid, face_ids: set[ int ], distance: float ) -> set[ int ]:
"""
Given a vtkUnstructuredGrid, a list of face cell IDs, and a distance, find unique cell IDs
whose centroids are within the distance to at least one face centroid.

Args:
mesh (vtkUnstructuredGrid)
face_ids (set[int]): cell IDs of the faces
distance (float): search radius

Returns:
set(int): unique cell IDs
"""
# Compute cell centers for all cells
cell_centers_filter = vtkCellCenters()
cell_centers_filter.SetInputData( mesh )
cell_centers_filter.Update()
centers_polydata: vtkPolyData = cell_centers_filter.GetOutput()

# Build kd-tree point locator on the cell centers
locator = vtkKdTreePointLocator()
locator.SetDataSet( centers_polydata )
locator.BuildLocator()

# Use a set to collect unique cell IDs
nearby_cells = set()

# For each face, get its centroid and find nearby points (cell IDs)
points: vtkPoints = centers_polydata.GetPoints()
for face_id in face_ids:
# Get face centroid
face_centroid = points.GetPoint( face_id )

# Find points within radius
result = vtkIdList()
locator.FindPointsWithinRadius( distance, face_centroid, result )

# Add to set
for j in range( result.GetNumberOfIds() ):
cell_id = result.GetId( j )
nearby_cells.add( cell_id )

return nearby_cells


def extractSurfaceFromElevation( mesh: vtkUnstructuredGrid, elevation: float ) -> vtkPolyData:
"""Extract surface at a constant elevation from a mesh.

Expand Down
Loading