Skip to content

Commit

Permalink
debug mac multiprocessing error
Browse files Browse the repository at this point in the history
  • Loading branch information
ArcticSnow committed Feb 26, 2024
1 parent 04b6457 commit 85492f1
Showing 1 changed file with 155 additions and 154 deletions.
309 changes: 155 additions & 154 deletions TopoPyScale/topo_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,161 @@ def clear_files(path: Union[Path, str]):
file.unlink()
print(f'{path} cleaned')

def pt_downscale_interp(row, ds_plev_pt, ds_surf_pt, meta):
pt_id = row.point_name
print(f'Downscaling t,q,p,tp,ws, wd for point: {pt_id}')

# ====== Horizontal interpolation ====================
interp_method = meta.get('interp_method')

# convert gridcells coordinates from WGS84 to DEM projection
lons, lats = np.meshgrid(ds_plev_pt.longitude.values, ds_plev_pt.latitude.values)
trans = Transformer.from_crs("epsg:4326", "epsg:" + str(target_EPSG), always_xy=True)
Xs, Ys = trans.transform(lons.flatten(), lats.flatten())
Xs = Xs.reshape(lons.shape)
Ys = Ys.reshape(lons.shape)

dist = np.sqrt((row.x - Xs) ** 2 + (row.y - Ys) ** 2)
if interp_method == 'idw':
idw = 1 / (dist ** 2)
weights = idw / np.sum(idw) # normalize idw to sum(idw) = 1
elif interp_method == 'linear':
weights = dist / np.sum(dist)
else:
sys.exit('ERROR: interpolation method not available')

# create a dataArray of weights to then propagate through the dataset
da_idw = xr.DataArray(data=weights,
coords={
"latitude": ds_plev_pt.latitude.values,
"longitude": ds_plev_pt.longitude.values,
},
dims=["latitude", "longitude"]
)
dw = xr.Dataset.weighted(ds_plev_pt, da_idw)
plev_interp = dw.sum(['longitude', 'latitude'],
keep_attrs=True) # compute horizontal inverse weighted horizontal interpolation
dww = xr.Dataset.weighted(ds_surf_pt, da_idw)
surf_interp = dww.sum(['longitude', 'latitude'],
keep_attrs=True) # compute horizontal inverse weighted horizontal interpolation

# ========= Converting z from [m**2 s**-2] to [m] asl =======
# plev_interp.z.values = plev_interp.z.values / g
# plev_interp.z.attrs = {'units': 'm', 'standard_name': 'Elevation', 'Long_name': 'Elevation of plevel'}

# surf_interp.z.values = surf_interp.z.values / g # convert geopotential height to elevation (in m), normalizing by g
# surf_interp.z.attrs = {'units': 'm', 'standard_name': 'Elevation', 'Long_name': 'Elevation of ERA5 surface'}

# ============ Extract specific humidity (q) for both dataset ============
surf_interp = mu.dewT_2_q_magnus(surf_interp, mu.var_era_surf)
plev_interp = mu.t_rh_2_dewT(plev_interp, mu.var_era_plevel)

down_pt = xr.Dataset(coords={
'time': plev_interp.time,
#'point_name': pd.to_numeric(pt_id, errors='ignore')
'point_name': pt_id
})

if (row.elevation < plev_interp.z.isel(level=-1)).sum():
raise Warning("---> WARNING: Point {} is {} m lower than the {} hPa geopotential\n=> "
"Values sampled from Psurf and lowest Plevel. No vertical interpolation".format(i,
np.round(np.min(row.elevation - plev_interp.z.isel(level=-1).values), 0),
plev_interp.isel(level=-1).level.data))
ind_z_top = (plev_interp.where(plev_interp.z > row.elevation).z - row.elevation).argmin('level')
top = plev_interp.isel(level=ind_z_top)

down_pt['t'] = top['t']
down_pt['u'] = top.u
down_pt['v'] = top.v
down_pt['q'] = top.q
down_pt['p'] = top.level * (10 ** 2) * np.exp(
-(row.elevation - top.z) / (0.5 * (top.t + down_pt.t) * R / g)) # Pressure in bar

else:
# ========== Vertical interpolation at the DEM surface z ===============
ind_z_bot = (plev_interp.where(plev_interp.z < row.elevation).z - row.elevation).argmax('level')

# In case upper pressure level elevation is not high enough, stop process and print error
try:
ind_z_top = (plev_interp.where(plev_interp.z > row.elevation).z - row.elevation).argmin('level')
except:
raise ValueError(
f'ERROR: Upper pressure level {plev_interp.level.min().values} hPa geopotential is lower than cluster mean elevation')

top = plev_interp.isel(level=ind_z_top)
bot = plev_interp.isel(level=ind_z_bot)

# Preparing interpolation weights for linear interpolation =======================
dist = np.array([np.abs(bot.z - row.elevation).values, np.abs(top.z - row.elevation).values])
# idw = 1/dist**2
# weights = idw / np.sum(idw, axis=0)
weights = dist / np.sum(dist, axis=0)

# ============ Creating a dataset containing the downscaled timeseries ========================
down_pt['t'] = bot.t * weights[1] + top.t * weights[0]
down_pt['u'] = bot.u * weights[1] + top.u * weights[0]
down_pt['v'] = bot.v * weights[1] + top.v * weights[0]
down_pt['q'] = bot.q * weights[1] + top.q * weights[0]
down_pt['p'] = top.level * (10 ** 2) * np.exp(
-(row.elevation - top.z) / (0.5 * (top.t + down_pt.t) * R / g)) # Pressure in bar

# ======= logic to compute ws, wd without loading data in memory, and maintaining the power of dask
down_pt['month'] = ('time', down_pt.time.dt.month.data)
print("precip lapse rate = " + str(precip_lapse_rate_flag))
if precip_lapse_rate_flag:
monthly_coeffs = xr.Dataset(
{
'coef': (['month'], [0.35, 0.35, 0.35, 0.3, 0.25, 0.2, 0.2, 0.2, 0.2, 0.25, 0.3, 0.35])
},
coords={'month': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]}
)
down_pt['precip_lapse_rate'] = (1 + monthly_coeffs.coef.sel(month=down_pt.month.values).data * (
row.elevation - surf_interp.z) * 1e-3) / \
(1 - monthly_coeffs.coef.sel(month=down_pt.month.values).data * (
row.elevation - surf_interp.z) * 1e-3)
else:
down_pt['precip_lapse_rate'] = down_pt.t * 0 + 1

down_pt['tp'] = down_pt.precip_lapse_rate * surf_interp.tp * 1 / meta.get('tstep') * 10 ** 3 # Convert to mm/hr
down_pt['theta'] = np.arctan2(-down_pt.u, -down_pt.v)
down_pt['theta_neg'] = (down_pt.theta < 0) * (down_pt.theta + 2 * np.pi)
down_pt['theta_pos'] = (down_pt.theta >= 0) * down_pt.theta
down_pt = down_pt.drop('theta')
down_pt['wd'] = (down_pt.theta_pos + down_pt.theta_neg) # direction in Rad
down_pt['ws'] = np.sqrt(down_pt.u ** 2 + down_pt.v ** 2)
down_pt = down_pt.drop(['theta_pos', 'theta_neg', 'month'])

down_pt.t.attrs = {'units': 'K', 'long_name': 'Temperature', 'standard_name': 'air_temperature'}
down_pt.q.attrs = {'units': 'kg kg**-1', 'long_name': 'Specific humidity', 'standard_name': 'specific_humidity'}
down_pt.u.attrs = {'units': 'm s**-1', 'long_name': 'U component of wind', 'standard_name': 'eastward_wind'}
down_pt.v.attrs = {'units': 'm s**-1', 'long_name': 'V component of wind', 'standard_name': 'northward wind'}
down_pt.p.attrs = {'units': 'bar', 'long_name': 'Pression atmospheric', 'standard_name': 'pression_atmospheric'}

down_pt.ws.attrs = {'units': 'm s**-1', 'long_name': 'Wind speed', 'standard_name': 'wind_speed'}
down_pt.wd.attrs = {'units': 'deg', 'long_name': 'Wind direction', 'standard_name': 'wind_direction'}
down_pt.tp.attrs = {'units': 'mm hr**-1', 'long_name': 'Precipitation', 'standard_name': 'precipitation'}
down_pt.precip_lapse_rate.attrs = {'units': 'mm hr**-1',
'long_name': 'Precipitation after lapse-rate correction',
'standard_name': 'precipitation_after_lapse-rate_correction'}

down_pt.attrs = {'title':'Downscale point using TopoPyScale',
'date_created':dt.datetime.now().strftime('%Y/%m/%d %H:%M:%S')}

print(f'---> Storing point {pt_id} to {output_directory.name}/tmp/')

comp = dict(zlib=True, complevel=5)
encoding = {var: comp for var in down_pt.data_vars}
down_pt.to_netcdf(output_directory / f'tmp/down_pt_{pt_id}.nc',
engine='h5netcdf', encoding=encoding)

comp = dict(zlib=True, complevel=5)
encoding = {var: comp for var in surf_interp.data_vars}
surf_interp.to_netcdf(output_directory / f'tmp/surf_interp_{pt_id}.nc',
engine='h5netcdf', encoding=encoding)
down_pt = None
surf_interp = None
top, bot = None, None


def downscale_climate(project_directory,
climate_directory,
Expand Down Expand Up @@ -217,161 +372,7 @@ def _subset_climate_dataset(ds_, row, type='plev'):
'file_pattern': file_pattern})
i+=1

def pt_downscale_interp(row, ds_plev_pt, ds_surf_pt, meta):
pt_id = row.point_name
print(f'Downscaling t,q,p,tp,ws, wd for point: {pt_id}')

# ====== Horizontal interpolation ====================
interp_method = meta.get('interp_method')

# convert gridcells coordinates from WGS84 to DEM projection
lons, lats = np.meshgrid(ds_plev_pt.longitude.values, ds_plev_pt.latitude.values)
trans = Transformer.from_crs("epsg:4326", "epsg:" + str(target_EPSG), always_xy=True)
Xs, Ys = trans.transform(lons.flatten(), lats.flatten())
Xs = Xs.reshape(lons.shape)
Ys = Ys.reshape(lons.shape)

dist = np.sqrt((row.x - Xs) ** 2 + (row.y - Ys) ** 2)
if interp_method == 'idw':
idw = 1 / (dist ** 2)
weights = idw / np.sum(idw) # normalize idw to sum(idw) = 1
elif interp_method == 'linear':
weights = dist / np.sum(dist)
else:
sys.exit('ERROR: interpolation method not available')

# create a dataArray of weights to then propagate through the dataset
da_idw = xr.DataArray(data=weights,
coords={
"latitude": ds_plev_pt.latitude.values,
"longitude": ds_plev_pt.longitude.values,
},
dims=["latitude", "longitude"]
)
dw = xr.Dataset.weighted(ds_plev_pt, da_idw)
plev_interp = dw.sum(['longitude', 'latitude'],
keep_attrs=True) # compute horizontal inverse weighted horizontal interpolation
dww = xr.Dataset.weighted(ds_surf_pt, da_idw)
surf_interp = dww.sum(['longitude', 'latitude'],
keep_attrs=True) # compute horizontal inverse weighted horizontal interpolation

# ========= Converting z from [m**2 s**-2] to [m] asl =======
# plev_interp.z.values = plev_interp.z.values / g
# plev_interp.z.attrs = {'units': 'm', 'standard_name': 'Elevation', 'Long_name': 'Elevation of plevel'}

# surf_interp.z.values = surf_interp.z.values / g # convert geopotential height to elevation (in m), normalizing by g
# surf_interp.z.attrs = {'units': 'm', 'standard_name': 'Elevation', 'Long_name': 'Elevation of ERA5 surface'}

# ============ Extract specific humidity (q) for both dataset ============
surf_interp = mu.dewT_2_q_magnus(surf_interp, mu.var_era_surf)
plev_interp = mu.t_rh_2_dewT(plev_interp, mu.var_era_plevel)

down_pt = xr.Dataset(coords={
'time': plev_interp.time,
#'point_name': pd.to_numeric(pt_id, errors='ignore')
'point_name': pt_id
})

if (row.elevation < plev_interp.z.isel(level=-1)).sum():
raise Warning("---> WARNING: Point {} is {} m lower than the {} hPa geopotential\n=> "
"Values sampled from Psurf and lowest Plevel. No vertical interpolation".
format(i,
np.round(np.min(row.elevation - plev_interp.z.isel(level=-1).values), 0),
plev_interp.isel(level=-1).level.data))
ind_z_top = (plev_interp.where(plev_interp.z > row.elevation).z - row.elevation).argmin('level')
top = plev_interp.isel(level=ind_z_top)

down_pt['t'] = top['t']
down_pt['u'] = top.u
down_pt['v'] = top.v
down_pt['q'] = top.q
down_pt['p'] = top.level * (10 ** 2) * np.exp(
-(row.elevation - top.z) / (0.5 * (top.t + down_pt.t) * R / g)) # Pressure in bar

else:
# ========== Vertical interpolation at the DEM surface z ===============
ind_z_bot = (plev_interp.where(plev_interp.z < row.elevation).z - row.elevation).argmax('level')

# In case upper pressure level elevation is not high enough, stop process and print error
try:
ind_z_top = (plev_interp.where(plev_interp.z > row.elevation).z - row.elevation).argmin('level')
except:
raise ValueError(
f'ERROR: Upper pressure level {plev_interp.level.min().values} hPa geopotential is lower than cluster mean elevation')

top = plev_interp.isel(level=ind_z_top)
bot = plev_interp.isel(level=ind_z_bot)

# Preparing interpolation weights for linear interpolation =======================
dist = np.array([np.abs(bot.z - row.elevation).values, np.abs(top.z - row.elevation).values])
# idw = 1/dist**2
# weights = idw / np.sum(idw, axis=0)
weights = dist / np.sum(dist, axis=0)

# ============ Creating a dataset containing the downscaled timeseries ========================
down_pt['t'] = bot.t * weights[1] + top.t * weights[0]
down_pt['u'] = bot.u * weights[1] + top.u * weights[0]
down_pt['v'] = bot.v * weights[1] + top.v * weights[0]
down_pt['q'] = bot.q * weights[1] + top.q * weights[0]
down_pt['p'] = top.level * (10 ** 2) * np.exp(
-(row.elevation - top.z) / (0.5 * (top.t + down_pt.t) * R / g)) # Pressure in bar

# ======= logic to compute ws, wd without loading data in memory, and maintaining the power of dask
down_pt['month'] = ('time', down_pt.time.dt.month.data)
print("precip lapse rate = " + str(precip_lapse_rate_flag))
if precip_lapse_rate_flag:
monthly_coeffs = xr.Dataset(
{
'coef': (['month'], [0.35, 0.35, 0.35, 0.3, 0.25, 0.2, 0.2, 0.2, 0.2, 0.25, 0.3, 0.35])
},
coords={'month': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]}
)
down_pt['precip_lapse_rate'] = (1 + monthly_coeffs.coef.sel(month=down_pt.month.values).data * (
row.elevation - surf_interp.z) * 1e-3) / \
(1 - monthly_coeffs.coef.sel(month=down_pt.month.values).data * (
row.elevation - surf_interp.z) * 1e-3)
else:
down_pt['precip_lapse_rate'] = down_pt.t * 0 + 1

down_pt['tp'] = down_pt.precip_lapse_rate * surf_interp.tp * 1 / meta.get('tstep') * 10 ** 3 # Convert to mm/hr
down_pt['theta'] = np.arctan2(-down_pt.u, -down_pt.v)
down_pt['theta_neg'] = (down_pt.theta < 0) * (down_pt.theta + 2 * np.pi)
down_pt['theta_pos'] = (down_pt.theta >= 0) * down_pt.theta
down_pt = down_pt.drop('theta')
down_pt['wd'] = (down_pt.theta_pos + down_pt.theta_neg) # direction in Rad
down_pt['ws'] = np.sqrt(down_pt.u ** 2 + down_pt.v ** 2)
down_pt = down_pt.drop(['theta_pos', 'theta_neg', 'month'])

down_pt.t.attrs = {'units': 'K', 'long_name': 'Temperature', 'standard_name': 'air_temperature'}
down_pt.q.attrs = {'units': 'kg kg**-1', 'long_name': 'Specific humidity', 'standard_name': 'specific_humidity'}
down_pt.u.attrs = {'units': 'm s**-1', 'long_name': 'U component of wind', 'standard_name': 'eastward_wind'}
down_pt.v.attrs = {'units': 'm s**-1', 'long_name': 'V component of wind', 'standard_name': 'northward wind'}
down_pt.p.attrs = {'units': 'bar', 'long_name': 'Pression atmospheric', 'standard_name': 'pression_atmospheric'}

down_pt.ws.attrs = {'units': 'm s**-1', 'long_name': 'Wind speed', 'standard_name': 'wind_speed'}
down_pt.wd.attrs = {'units': 'deg', 'long_name': 'Wind direction', 'standard_name': 'wind_direction'}
down_pt.tp.attrs = {'units': 'mm hr**-1', 'long_name': 'Precipitation', 'standard_name': 'precipitation'}
down_pt.precip_lapse_rate.attrs = {'units': 'mm hr**-1',
'long_name': 'Precipitation after lapse-rate correction',
'standard_name': 'precipitation_after_lapse-rate_correction'}

down_pt.attrs = {'title':'Downscale point using TopoPyScale',
'date_created':dt.datetime.now().strftime('%Y/%m/%d %H:%M:%S')}

print(f'---> Storing point {pt_id} to {output_directory.name}/tmp/')

comp = dict(zlib=True, complevel=5)
encoding = {var: comp for var in down_pt.data_vars}
down_pt.to_netcdf(output_directory / f'tmp/down_pt_{pt_id}.nc',
engine='h5netcdf', encoding=encoding)

comp = dict(zlib=True, complevel=5)
encoding = {var: comp for var in surf_interp.data_vars}
surf_interp.to_netcdf(output_directory / f'tmp/surf_interp_{pt_id}.nc',
engine='h5netcdf', encoding=encoding)
down_pt = None
surf_interp = None
top, bot = None, None

fun_param = zip(row_list, plev_pt_list, surf_pt_list, meta_list) # construct here the tuple that goes into the pooling for arguments
tu.multicore_pooling(pt_downscale_interp, fun_param, n_core)
Expand Down

0 comments on commit 85492f1

Please sign in to comment.