[ ]:
[1]:
"""
project: CTU@GeoHarmonizer
date: 2021-07-19
author: lukas.brodsky@fsv.cvut.cz
purpose: manage geographic data input / output operations with GDAL/OGR
"""
[1]:
'\nproject: CTU@GeoHarmonizer\ndate: 2021-07-19\nauthor: lukas.brodsky@fsv.cvut.cz\npurpose: manage geographic data input / output operations with GDAL/OGR\n'
[2]:
import os
import glob
from osgeo import gdal
from osgeo import gdal_array
from osgeo import ogr
from osgeo import osr
from osgeo.gdalconst import *
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
Cell In[2], line 4
2 import glob
3 from osgeo import gdal
----> 4 from osgeo import gdal_array
5 from osgeo import ogr
6 from osgeo import osr
File /usr/local/lib/python3.11/site-packages/osgeo/gdal_array.py:13
11 # Import the low-level C/C++ module
12 if __package__ or "." in __name__:
---> 13 from . import _gdal_array
14 else:
15 import _gdal_array
ImportError: cannot import name '_gdal_array' from 'osgeo' (/usr/local/lib/python3.11/site-packages/osgeo/__init__.py)
[3]:
import numpy as np
[ ]:
[4]:
def get_image_attribute(image_filename):
""" Use GDAL to open image and return attributes
Args:
image_filename (str): image filename
Returns:
tuple: nrow (int), ncol (int), nband (int), NumPy datatype (type)
"""
try:
image_ds = gdal.Open(image_filename, gdal.GA_ReadOnly)
except Exception as e:
# logger.error('Could not open image dataset ({f}): {e}'
# .format(f=image_filename, e=str(e)))
print('Could not open image dataset.')
raise
nrow = image_ds.RasterYSize
ncol = image_ds.RasterXSize
nband = image_ds.RasterCount
dtype = gdal_array.GDALTypeCodeToNumericTypeCode(
image_ds.GetRasterBand(1).DataType)
return (nrow, ncol, nband, dtype)
[5]:
def get_image_geo_attributes(image_filename):
""" Use GDAL to open image and return geo-attributes
Args:
image_filename (str): image filename
Returns:
tuple: projection (str), geo_transform (tuple)
"""
try:
if os.path.isfile((image_filename)):
image_ds = gdal.Open(image_filename, gdal.GA_ReadOnly)
except Exception as e:
# logger.error('Could not open image dataset ({f}): {e}'
# .format(f=image_filename, e=str(e)))
print('Could not open image dataset.')
raise
projection = image_ds.GetProjection()
geo_transform = image_ds.GetGeoTransform()
return projection, geo_transform
[6]:
def read_image(image_filename, bands):
""" Use GDAL to open image and return geo-attributes
Args:
image_filename (str): image filename
bands (list): list of bands to read from image
Returns:
ndarray: img_arr [bnads, y, x]
"""
try:
if os.path.isfile((image_filename)):
image_ds = gdal.Open(image_filename, gdal.GA_ReadOnly)
except Exception as e:
# logger.error('Could not open image dataset ({f}): {e}'
# .format(f=image_filename, e=str(e)))
print('Could not open image dataset.')
raise
# assert(image_ds)
rb = image_ds.GetRasterBand(1)
b1 = src_array = rb.ReadAsArray()
img_arr = np.zeros((len(bands), b1.shape[0], b1.shape[1]), dtype=np.float)
# img_arr[img_arr == 0] = np.nan
b = 0
for band in bands:
data_b = image_ds.GetRasterBand(band)
img_arr[b, :, :] = data_b.ReadAsArray()
b += 1
return img_arr
[7]:
# vector_file_name = in_vec
def read_vector(vector_filename):
"""
:param vector_file_name:
:return:
"""
print(vector_filename)
try:
vds = ogr.Open(vector_filename, gdal.OF_VECTOR)
except Exception as e:
# logger.error('Could not open vector dataset ({f}): {e}'
# .format(f=image_filename, e=str(e)))
print('Could not open vector dataset.')
raise
vlyr = vds.GetLayer()
return vlyr
[8]:
def create_vector(overlay_data, file_name, ogr_format, epsg):
"""Create a vector file from overlay data
"""
drv = ogr.GetDriverByName(ogr_format)
vector_ds = drv.CreateDataSource(file_name)
srs = osr.SpatialReference()
srs.ImportFromEPSG(epsg)
# create the layer
layer_name = os.path.basename(file_name).split('.')[0]
layer = vector_ds.CreateLayer(layer_name, srs, ogr.wkbPoint)
# Add the attribute fields
attributes = list(overlay_data[0].keys())
if 'id' in attributes:
field_name = ogr.FieldDefn("id", ogr.OFTInteger)
layer.CreateField(field_name)
if 'x' in attributes:
field_x = ogr.FieldDefn("x", ogr.OFTReal)
layer.CreateField(field_x)
if 'y' in attributes:
field_y = ogr.FieldDefn("y", ogr.OFTReal)
layer.CreateField(field_y)
if 'reference' in attributes:
field_reference = ogr.FieldDefn("reference", ogr.OFTInteger)
layer.CreateField(field_reference)
if 'map' in attributes:
field_map = ogr.FieldDefn("map", ogr.OFTInteger)
layer.CreateField(field_map)
if 'status' in attributes:
field_status = ogr.FieldDefn("status", ogr.OFTInteger)
layer.CreateField(field_status)
# Process the attributes and features to the vector
# item = overlay_data[1]
for item in overlay_data:
# print(item)
# create the feature
feature = ogr.Feature(layer.GetLayerDefn())
# Set the attributes
if item['status'] is not None:
if item['id'] is not None:
feature.SetField('id', item['id'])
if item['x'] is not None:
feature.SetField('x', item['x'])
if item['y'] is not None:
feature.SetField('y', item['y'])
if item['reference'] is not None:
feature.SetField('reference', int(item['reference']))
if item['map'] is not None:
feature.SetField('map', int(item['map']))
if item['status'] is not None:
if item['status']:
feature.SetField('status', 1)
else:
feature.SetField('status', 0)
# create the WKT for the feature using Python string formatting
wkt = "POINT(%f %f)" % (float(item['x']), float(item['y']))
# Create the point from the Well Known Txt
point = ogr.CreateGeometryFromWkt(wkt)
# Set the feature geometry using the point
feature.SetGeometry(point)
# Create the feature in the layer (shapefile)
layer.CreateFeature(feature)
# Dereference the feature
feature = None
vector_ds = None
return 0
[ ]:
[9]:
def create_template_vector(vector_filename, ogr_format, epsg, example_vector, new_att):
"""
:param vector_fn:
:return:
"""
if os.path.isfile(vector_filename):
# os.remove(vector_filename)
for fn in glob.glob(vector_filename.split('.')[0] + '*'):
os.remove(fn)
# create
driver = ogr.GetDriverByName(ogr_format)
if driver is None:
print('%s driver not available.\n' % ogr_format)
dataset_out = driver.CreateDataSource(vector_filename)
if dataset_out is None:
print('Creation of output file failed.\n')
sr = osr.SpatialReference()
sr.ImportFromEPSG(epsg)
layer_name = os.path.basename(vector_filename).split('.')[0]
outlayer = dataset_out.CreateLayer(layer_name, geom_type=ogr.wkbPoint, srs=sr)
# setup attributes
example_ds = driver.Open(example_vector, 0)
if example_ds is None:
print('Could not open %s' % (example_vector))
else:
example_layer = example_ds.GetLayer()
layer_definition = example_layer.GetLayerDefn()
for i in range(layer_definition.GetFieldCount()):
field_def = layer_definition.GetFieldDefn(i)
if outlayer.CreateField(field_def) != 0:
print('Creating %s field.\n' % field_def.GetNameRef())
# Add a field for map value TODO: integer to 16b.?
if new_att is not None:
new_field = ogr.FieldDefn(new_att, ogr.OFTInteger)
outlayer.CreateField(new_field)
# TODO: better closing
outlayer = None
example_layer = None
return 0
[10]:
def check_read_raster(image_filename):
""" Use GDAL to check open raster image
Args:
image_filename (str): image filename
Returns:
bool: valid (boolean)
"""
valid = False
image_ds = None
try:
image_ds = gdal.Open(image_filename, gdal.GA_ReadOnly)
if image_ds is not None:
valid = True
except RuntimeError as e:
print('Could not open image dataset.')
raise
image_ds = None
return valid
[11]:
def check_read_vector(vector_filename):
""" Use GDAL to check open raster image
Args:
image_filename (str): image filename
Returns:
bool: valid (boolean)
"""
valid = False
vector_ds = None
if os.path.basename(vector_filename).split('.')[-1] == 'shp':
ogr_format = 'ESRI Shapefile'
elif os.path.basename(vector_filename).split('.')[-1] == 'gpkg':
ogr_format = 'GPKG'
driver = ogr.GetDriverByName(ogr_format)
if driver is None:
print('%s driver not available.\n' % ogr_format)
try:
vector_ds = driver.Open(vector_filename, 0)
if vector_ds is not None:
valid = True
except RuntimeError as e:
print('Could not open image dataset.')
raise
return valid
[ ]: