Source code for vyperdatum.transformer

import os
import pathlib
import logging
from typing import Union, Optional
import pyproj as pp
from pyproj.transformer import TransformerGroup
from pyproj._transformer import AreaOfInterest
import numpy as np
from osgeo import gdal, osr, ogr
from tqdm import tqdm
from vyperdatum.utils import raster_utils


logger = logging.getLogger("root_logger")
gdal.UseExceptions()


[docs] class Transformer(): """ """ def __init__(self, crs_from: Union[pp.CRS, int, str], crs_to: Union[pp.CRS, int, str], always_xy: bool = False, area_of_interest: Optional[AreaOfInterest] = None, authority: Optional[str] = None, accuracy: Optional[float] = None, allow_ballpark: Optional[bool] = False, force_over: bool = False, only_best: Optional[bool] = True ) -> None: """ .. Some of the parameter descriptions adopted from pyproj :class:`Transformer` Parameters ---------- crs_from: pyproj.crs.CRS or input used to create one Projection of input data. crs_to: pyproj.crs.CRS or input used to create one Projection of output data. always_xy: bool, default=False If true, the transform method will accept as input and return as output coordinates using the traditional GIS order, that is longitude, latitude for geographic CRS and easting, northing for most projected CRS. area_of_interest: :class:`.AreaOfInterest`, optional The area of interest to help select the transformation. authority: str, optional When not specified, coordinate operations from any authority will be searched, with the restrictions set in the authority_to_authority_preference database table related to the authority of the source/target CRS themselves. If authority is set to “any”, then coordinate operations from any authority will be searched. If authority is a non-empty string different from "any", then coordinate operations will be searched only in that authority namespace (e.g. EPSG). accuracy: float, optional The minimum desired accuracy (in metres) of the candidate coordinate operations. allow_ballpark: bool, optional, default=False Set to False to disallow the use of Ballpark transformation in the candidate coordinate operations. Default is to allow. force_over: bool, default=False If True, it will to force the +over flag on the transformation. Requires PROJ 9+. only_best: bool, optional, default=True Can be set to True to cause PROJ to error out if the best transformation known to PROJ and usable by PROJ if all grids known and usable by PROJ were accessible, cannot be used. Best transformation should be understood as the transformation returned by :c:func:`proj_get_suggested_operation` if all known grids were accessible (either locally or through network). Note that the default value for this option can be also set with the :envvar:`PROJ_ONLY_BEST_DEFAULT` environment variable, or with the ``only_best_default`` setting of :ref:`proj-ini`. The only_best kwarg overrides the default value if set. Requires PROJ 9.2+. """ if not isinstance(crs_from, pp.CRS): crs_from = pp.CRS(crs_from) if not isinstance(crs_to, pp.CRS): crs_to = pp.CRS(crs_to) self.crs_from = crs_from self.crs_to = crs_to self.transformer_group = TransformerGroup(crs_from=self.crs_from, crs_to=self.crs_to, allow_ballpark=allow_ballpark ) if len(self.transformer_group.transformers) > 0: print(f"Found {len(self.transformer_group.transformers)} transformer(s) for" f"\n\tcrs_from: {self.crs_from.name}\n\tcrs_to: {self.crs_to.name}") else: err_msg = ("No transformers identified for the following transformation:" f"\n\tcrs_from: {self.crs_from.name}\n\tcrs_to: {self.crs_to.name}") logger.exception(err_msg) raise NotImplementedError(err_msg) self.transformer = pp.Transformer.from_crs(crs_from=self.crs_from, crs_to=self.crs_to, always_xy=always_xy, area_of_interest=area_of_interest, authority=authority, accuracy=accuracy, allow_ballpark=allow_ballpark, force_over=force_over, only_best=only_best ) if not self.transformer.has_inverse: logger.warning("No inverse transformer has defined!")
[docs] def transform_points(self, x: Union[float, int, list, np.ndarray], y: Union[float, int, list, np.ndarray], z: Union[float, int, list, np.ndarray], ): """ Conduct point transformation between two coordinate reference systems. Parameters ---------- x: numeric scalar or array Input x coordinate(s). y: numeric scalar or array Input y coordinate(s). z: numeric scalar or array, optional Input z coordinate(s). """ try: xt, yt, zt = None, None, None xt, yt, zt = self.transformer.transform(x, y, z) except Exception: logger.exception("Error while running the point transformation.") return xt, yt, zt
[docs] @staticmethod def gdal_extensions() -> list[str]: """ Return a lower-cased list of driver names supported by gdal. Returns ------- list[str] """ return sorted( ["." + gdal.GetDriver(i).ShortName.lower() for i in range(gdal.GetDriverCount())] + [".tif", ".tiff"] )
[docs] def transform_raster(self, input_file: str, output_file: str, apply_vertical: bool, overview: bool = False, warp_kwargs: Optional[dict] = None, final_step: bool = False, ) -> bool: """ Transform the gdal-supported input rater file (`input_file`) and store the transformed file on the local disk (`output_file`). Raises ------- ValueError: If `.crs_input` or `.crs_output` is not set. FileNotFoundError: If the input raster file is not found. NotImplementedError: If the input vector file is not supported by gdal. Parameters ----------- input_file: str Path to the input raster file (gdal supported). output_file: str Path to the transformed raster file. apply_vertical: bool Apply GDAL vertical shift. output_file: str Path to the transformed raster file. overview: bool, default=True If True, overview bands are added to the output raster file (only GTiff support). final_step: bool True for the final step of a multi-step transformations, otherwise False. Returns -------- bool: True if successful, otherwise False. """ if not (isinstance(self.crs_from, pp.CRS) & isinstance(self.crs_to, pp.CRS)): raise ValueError(("The `.crs_input` and `.crs_output` attributes" "must be set with `pyproj.CRS` type values.") ) if not os.path.isfile(input_file): raise FileNotFoundError(f"The input raster file not found at {input_file}.") if pathlib.Path(input_file).suffix.lower() not in self.gdal_extensions(): raise NotImplementedError(f"{pathlib.Path(input_file).suffix} is not supported") try: success = False input_metadata = raster_utils.raster_metadata(input_file) raster_utils.pre_transformation_checks(source_meta=input_metadata) raster_utils.warp(input_file=input_file, output_file=output_file, apply_vertical=apply_vertical, crs_from=self.crs_from, crs_to=self.crs_to, input_metadata=input_metadata, warp_kwargs=warp_kwargs ) output_metadata = raster_utils.raster_metadata(output_file) raster_utils.post_transformation_checks(source_meta=input_metadata, target_meta=output_metadata, target_crs=self.crs_to, vertical_transform=apply_vertical ) # if apply_vertical and isinstance(warp_kwargs.get("srcBands"), list): # raster_utils.unchanged_to_nodata(src_raster_file=input_file, # xform_raster_file=output_file, # xform_band=warp_kwargs.get("srcBands")[0]) if overview and input_metadata["driver"].lower() == "gtiff": raster_utils.add_overview(raster_file=output_file, compression=input_metadata["compression"] ) # raster_utils.add_rat(output_file) success = True finally: return success
[docs] def transform_vector(self, input_file: str, output_file: str ) -> bool: """ Transform the gdal-supported input vector file (`input_file`) and store the transformed file on the local disk (`output_file`). Raises ------- ValueError: If `.crs_input` or `.crs_output` is not set. FileNotFoundError: If the input vector file is not found. NotImplementedError: If the input vector file is not supported by gdal. Parameters ----------- input_file: str Path to the input vector file (gdal supported). output_file: str Path to the transformed vector file. Returns -------- bool: True if successful, otherwise False. """ try: if not (isinstance(self.crs_from, pp.CRS) & isinstance(self.crs_to, pp.CRS)): raise ValueError("The `.crs_input` and `.crs_output` attributes" "must be set with `pyproj.CRS` type values." ) if not os.path.isfile(input_file): raise FileNotFoundError(f"The input vector file not found at {input_file}.") if pathlib.Path(input_file).suffix.lower() not in self.gdal_extensions(): raise NotImplementedError(f"{pathlib.Path(input_file).suffix} is not supported") pbar, success = None, False ds = gdal.OpenEx(input_file) driver = ogr.GetDriverByName(ds.GetDriver().ShortName) inSpatialRef = osr.SpatialReference() inSpatialRef.ImportFromWkt(self.crs_from.to_wkt()) outSpatialRef = osr.SpatialReference() outSpatialRef.ImportFromWkt(self.crs_to.to_wkt()) coordTrans = osr.CoordinateTransformation(inSpatialRef, outSpatialRef) inDataSet = driver.Open(input_file) if os.path.exists(output_file): driver.DeleteDataSource(output_file) outDataSet = driver.CreateDataSource(output_file) layer_count = inDataSet.GetLayerCount() for layer_index in range(layer_count): inLayer = inDataSet.GetLayer(layer_index) outLayer = outDataSet.CreateLayer(inLayer.GetName(), geom_type=ogr.wkbMultiPolygon) inLayerDefn = inLayer.GetLayerDefn() for i in range(0, inLayerDefn.GetFieldCount()): fieldDefn = inLayerDefn.GetFieldDefn(i) outLayer.CreateField(fieldDefn) outLayerDefn = outLayer.GetLayerDefn() inFeature = inLayer.GetNextFeature() feature_count = inLayer.GetFeatureCount() pbar = tqdm(total=feature_count) feature_counter = 0 while inFeature: geom = inFeature.GetGeometryRef() geom.Transform(coordTrans) outFeature = ogr.Feature(outLayerDefn) outFeature.SetGeometry(geom) for i in range(0, outLayerDefn.GetFieldCount()): outFeature.SetField(outLayerDefn.GetFieldDefn(i).GetNameRef(), inFeature.GetField(i)) outLayer.CreateFeature(outFeature) outFeature = None inFeature = inLayer.GetNextFeature() feature_counter += 1 pbar.update(1) pbar.set_description(f"Processing Layer {layer_index+1} / {layer_count}") inDataSet, outDataSet, ds = None, None, None success = True finally: if pbar: pbar.close() return success