from pathlib import Path
import pytest
from xarray import concat
from xarray.tests.test_dataset import create_test_data
import xarray.testing as xrt
from natsort import natsorted
from xbout.load import (
_check_filetype,
_expand_wildcards,
_expand_filepaths,
_arrange_for_concatenation,
_trim,
_infer_contains_boundaries,
open_boutdataset,
_BOUT_PER_PROC_VARIABLES,
_BOUT_TIME_DEPENDENT_META_VARS,
)
from xbout.utils import _separate_metadata
from xbout.tests.utils_for_tests import create_bout_ds, METADATA_VARS
[docs]
def test_check_extensions(tmp_path):
files_dir = tmp_path.joinpath("data")
files_dir.mkdir()
example_nc_file = files_dir.joinpath("example.nc")
example_nc_file.write_text("content_nc")
filetype = _check_filetype(example_nc_file)
assert filetype == "netcdf4"
example_hdf5_file = files_dir.joinpath("example.h5netcdf")
example_hdf5_file.write_text("content_hdf5")
filetype = _check_filetype(example_hdf5_file)
assert filetype == "h5netcdf"
example_invalid_file = files_dir.joinpath("example.txt")
example_hdf5_file.write_text("content_txt")
with pytest.raises(IOError):
filetype = _check_filetype(example_invalid_file)
[docs]
def test_set_fci_coords(create_example_grid_file_fci, create_example_files_fci):
grid = create_example_grid_file_fci
data = create_example_files_fci
ds = open_boutdataset(data, gridfilepath=grid, geometry="fci")
assert "R" in ds
assert "Z" in ds
[docs]
class TestPathHandling:
[docs]
def test_glob_expansion_single(self, tmp_path):
files_dir = tmp_path.joinpath("data")
files_dir.mkdir()
example_file = files_dir.joinpath("example.0.nc")
example_file.write_text("content")
path = example_file
filepaths = _expand_wildcards(path)
assert filepaths[0] == example_file
path = files_dir.joinpath("example.*.nc")
filepaths = _expand_wildcards(path)
assert filepaths[0] == example_file
[docs]
@pytest.mark.parametrize(
"ii, jj", [(1, 1), (1, 4), (3, 1), (5, 3), (12, 1), (1, 12), (121, 2), (3, 111)]
)
def test_glob_expansion_both(self, tmp_path, ii, jj):
files_dir = tmp_path.joinpath("data")
files_dir.mkdir()
filepaths = []
for i in range(ii):
example_run_dir = files_dir.joinpath("run" + str(i))
example_run_dir.mkdir()
for j in range(jj):
example_file = example_run_dir.joinpath("example." + str(j) + ".nc")
example_file.write_text("content")
filepaths.append(example_file)
expected_filepaths = natsorted(filepaths, key=lambda filepath: str(filepath))
path = files_dir.joinpath("run*/example.*.nc")
actual_filepaths = _expand_wildcards(path)
assert actual_filepaths == expected_filepaths
[docs]
@pytest.mark.parametrize(
"ii, jj", [(1, 1), (1, 4), (3, 1), (5, 3), (1, 12), (3, 111)]
)
def test_glob_expansion_brackets(self, tmp_path, ii, jj):
files_dir = tmp_path.joinpath("data")
files_dir.mkdir()
filepaths = []
for i in range(ii):
example_run_dir = files_dir.joinpath("run" + str(i))
example_run_dir.mkdir()
for j in range(jj):
example_file = example_run_dir.joinpath("example." + str(j) + ".nc")
example_file.write_text("content")
filepaths.append(example_file)
expected_filepaths = natsorted(filepaths, key=lambda filepath: str(filepath))
path = files_dir.joinpath("run[1-9]/example.*.nc")
actual_filepaths = _expand_wildcards(path)
assert actual_filepaths == expected_filepaths[jj:]
[docs]
def test_no_files(self, tmp_path):
files_dir = tmp_path.joinpath("data")
files_dir.mkdir()
with pytest.raises(IOError):
path = files_dir.joinpath("run*/example.*.nc")
_expand_filepaths(path)
[docs]
@pytest.fixture()
def create_filepaths():
return _create_filepaths
def _create_filepaths(nxpe=1, nype=1, nt=1):
filepaths = []
for t in range(nt):
for i in range(nype):
for j in range(nxpe):
file_num = j + nxpe * i
path = "./run{}".format(str(t)) + "/BOUT.dmp.{}.nc".format(
str(file_num)
)
filepaths.append(path)
return filepaths
[docs]
class TestArrange:
[docs]
def test_arrange_single(self, create_filepaths):
paths = create_filepaths(nxpe=1, nype=1, nt=1)
expected_path_grid = [[["./run0/BOUT.dmp.0.nc"]]]
actual_path_grid, actual_concat_dims = _arrange_for_concatenation(
paths, nxpe=1, nype=1
)
assert expected_path_grid == actual_path_grid
assert actual_concat_dims == [None, None, None]
[docs]
def test_arrange_along_x(self, create_filepaths):
paths = create_filepaths(nxpe=3, nype=1, nt=1)
expected_path_grid = [
[["./run0/BOUT.dmp.0.nc", "./run0/BOUT.dmp.1.nc", "./run0/BOUT.dmp.2.nc"]]
]
actual_path_grid, actual_concat_dims = _arrange_for_concatenation(
paths, nxpe=3, nype=1
)
assert expected_path_grid == actual_path_grid
assert actual_concat_dims == [None, None, "x"]
[docs]
def test_arrange_along_y(self, create_filepaths):
paths = create_filepaths(nxpe=1, nype=3, nt=1)
expected_path_grid = [
[
["./run0/BOUT.dmp.0.nc"],
["./run0/BOUT.dmp.1.nc"],
["./run0/BOUT.dmp.2.nc"],
]
]
actual_path_grid, actual_concat_dims = _arrange_for_concatenation(
paths, nxpe=1, nype=3
)
assert expected_path_grid == actual_path_grid
assert actual_concat_dims == [None, "y", None]
[docs]
def test_arrange_along_t(self, create_filepaths):
paths = create_filepaths(nxpe=1, nype=1, nt=3)
expected_path_grid = [
[["./run0/BOUT.dmp.0.nc"]],
[["./run1/BOUT.dmp.0.nc"]],
[["./run2/BOUT.dmp.0.nc"]],
]
actual_path_grid, actual_concat_dims = _arrange_for_concatenation(
paths, nxpe=1, nype=1
)
assert expected_path_grid == actual_path_grid
assert actual_concat_dims == ["t", None, None]
[docs]
def test_arrange_along_xy(self, create_filepaths):
paths = create_filepaths(nxpe=3, nype=2, nt=1)
expected_path_grid = [
[
[
"./run0/BOUT.dmp.0.nc",
"./run0/BOUT.dmp.1.nc",
"./run0/BOUT.dmp.2.nc",
],
[
"./run0/BOUT.dmp.3.nc",
"./run0/BOUT.dmp.4.nc",
"./run0/BOUT.dmp.5.nc",
],
]
]
actual_path_grid, actual_concat_dims = _arrange_for_concatenation(
paths, nxpe=3, nype=2
)
assert expected_path_grid == actual_path_grid
assert actual_concat_dims == [None, "y", "x"]
[docs]
def test_arrange_along_xt(self, create_filepaths):
paths = create_filepaths(nxpe=3, nype=1, nt=2)
expected_path_grid = [
[["./run0/BOUT.dmp.0.nc", "./run0/BOUT.dmp.1.nc", "./run0/BOUT.dmp.2.nc"]],
[["./run1/BOUT.dmp.0.nc", "./run1/BOUT.dmp.1.nc", "./run1/BOUT.dmp.2.nc"]],
]
actual_path_grid, actual_concat_dims = _arrange_for_concatenation(
paths, nxpe=3, nype=1
)
assert expected_path_grid == actual_path_grid
assert actual_concat_dims == ["t", None, "x"]
[docs]
def test_arrange_along_xyt(self, create_filepaths):
paths = create_filepaths(nxpe=3, nype=2, nt=2)
expected_path_grid = [
[
[
"./run0/BOUT.dmp.0.nc",
"./run0/BOUT.dmp.1.nc",
"./run0/BOUT.dmp.2.nc",
],
[
"./run0/BOUT.dmp.3.nc",
"./run0/BOUT.dmp.4.nc",
"./run0/BOUT.dmp.5.nc",
],
],
[
[
"./run1/BOUT.dmp.0.nc",
"./run1/BOUT.dmp.1.nc",
"./run1/BOUT.dmp.2.nc",
],
[
"./run1/BOUT.dmp.3.nc",
"./run1/BOUT.dmp.4.nc",
"./run1/BOUT.dmp.5.nc",
],
],
]
actual_path_grid, actual_concat_dims = _arrange_for_concatenation(
paths, nxpe=3, nype=2
)
assert expected_path_grid == actual_path_grid
assert actual_concat_dims == ["t", "y", "x"]
# TODO also test loading multiple files which have guard cells
[docs]
class TestOpen:
[docs]
def test_single_file(self, tmp_path_factory, bout_xyt_example_files):
path = bout_xyt_example_files(
tmp_path_factory, nxpe=1, nype=1, nt=1, write_to_disk=True
)
with pytest.warns(UserWarning):
actual = open_boutdataset(datapath=path, keep_xboundaries=False)
expected = create_bout_ds()
expected = expected.set_coords(["t_array", "dx", "dy", "dz"]).rename(
t_array="t"
)
xrt.assert_equal(
actual.drop_vars(["x", "y", "z"]).load(),
expected.drop_vars(
METADATA_VARS
+ _BOUT_PER_PROC_VARIABLES
+ _BOUT_TIME_DEPENDENT_META_VARS,
errors="ignore",
),
)
# check creation without writing to disk gives identical result
fake_ds_list = bout_xyt_example_files(None, nxpe=1, nype=1, nt=1)
with pytest.warns(UserWarning):
fake = open_boutdataset(datapath=fake_ds_list, keep_xboundaries=False)
xrt.assert_identical(actual, fake)
[docs]
def test_squashed_file(self, tmp_path_factory, bout_xyt_example_files):
path = bout_xyt_example_files(
tmp_path_factory, nxpe=4, nype=3, nt=1, squashed=True, write_to_disk=True
)
with pytest.warns(UserWarning):
actual = open_boutdataset(datapath=path, keep_xboundaries=False)
expected = create_bout_ds(lengths=(6, 8, 12, 7))
expected = expected.set_coords(["t_array", "dx", "dy", "dz"]).rename(
t_array="t"
)
xrt.assert_equal(
actual.drop_vars(["x", "y", "z"]).load(),
expected.drop_vars(
METADATA_VARS
+ _BOUT_PER_PROC_VARIABLES
+ _BOUT_TIME_DEPENDENT_META_VARS,
errors="ignore",
),
)
# check creation without writing to disk gives identical result
fake_ds_list = bout_xyt_example_files(None, nxpe=4, nype=3, nt=1, squashed=True)
with pytest.warns(UserWarning):
fake = open_boutdataset(datapath=fake_ds_list, keep_xboundaries=False)
xrt.assert_identical(actual, fake)
[docs]
@pytest.mark.parametrize(
"keep_xboundaries", [False, pytest.param(True, marks=pytest.mark.long)]
)
@pytest.mark.parametrize(
"keep_yboundaries", [False, pytest.param(True, marks=pytest.mark.long)]
)
def test_squashed_doublenull(
self,
tmp_path_factory,
bout_xyt_example_files,
keep_xboundaries,
keep_yboundaries,
):
path = bout_xyt_example_files(
tmp_path_factory,
nxpe=4,
nype=6,
nt=1,
lengths=(6, 2, 4, 7),
guards={"x": 2, "y": 2},
squashed=True,
topology="lower-disconnected-double-null",
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=path,
keep_xboundaries=keep_xboundaries,
keep_yboundaries=keep_yboundaries,
)
# bout_xyt_example_files when creating a 'squashed' file just makes it with
# y-size nype*lengths[2]+2*myg, which is 6*4+4=28, so with upper and lower
# boundaries removed, y-size should be 28-4*2=20.
assert ds.sizes["t"] == 6
assert ds.sizes["x"] == 12 if keep_xboundaries else 8
assert ds.sizes["y"] == 32 if keep_yboundaries else 24
assert ds.sizes["z"] == 7
[docs]
@pytest.mark.parametrize(
"keep_xboundaries", [False, pytest.param(True, marks=pytest.mark.long)]
)
@pytest.mark.parametrize(
"keep_yboundaries", [False, pytest.param(True, marks=pytest.mark.long)]
)
def test_squashed_doublenull_file(
self,
tmp_path_factory,
bout_xyt_example_files,
keep_xboundaries,
keep_yboundaries,
):
path = bout_xyt_example_files(
tmp_path_factory,
nxpe=4,
nype=6,
nt=1,
lengths=(6, 4, 4, 7),
guards={"x": 2, "y": 2},
squashed=True,
write_to_disk=True,
topology="upper-disconnected-double-null",
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=path,
keep_xboundaries=keep_xboundaries,
keep_yboundaries=keep_yboundaries,
)
# bout_xyt_example_files when creating a 'squashed' file just makes it with
# y-size nype*lengths[2]+2*myg, which is 6*4+4=28, so with upper and lower
# boundaries removed, y-size should be 28-4*2=20.
assert ds.sizes["t"] == 6
assert ds.sizes["x"] == 20 if keep_xboundaries else 16
assert ds.sizes["y"] == 32 if keep_yboundaries else 24
assert ds.sizes["z"] == 7
[docs]
def test_combine_along_x(self, tmp_path_factory, bout_xyt_example_files):
path = bout_xyt_example_files(
tmp_path_factory,
nxpe=4,
nype=1,
nt=1,
syn_data_type="stepped",
write_to_disk=True,
)
with pytest.warns(UserWarning):
actual = open_boutdataset(datapath=path, keep_xboundaries=False)
bout_ds = create_bout_ds
expected = concat(
[bout_ds(0), bout_ds(1), bout_ds(2), bout_ds(3)],
dim="x",
data_vars="minimal",
)
expected = expected.set_coords(["t_array", "dx", "dy", "dz"]).rename(
t_array="t"
)
xrt.assert_equal(
actual.drop_vars(["x", "y", "z"]).load(),
expected.drop_vars(
METADATA_VARS + _BOUT_PER_PROC_VARIABLES, errors="ignore"
),
)
# check creation without writing to disk gives identical result
fake_ds_list = bout_xyt_example_files(
None, nxpe=4, nype=1, nt=1, syn_data_type="stepped"
)
with pytest.warns(UserWarning):
fake = open_boutdataset(datapath=fake_ds_list, keep_xboundaries=False)
xrt.assert_identical(actual, fake)
[docs]
def test_combine_along_y(self, tmp_path_factory, bout_xyt_example_files):
path = bout_xyt_example_files(
tmp_path_factory,
nxpe=1,
nype=3,
nt=1,
syn_data_type="stepped",
write_to_disk=True,
)
with pytest.warns(UserWarning):
actual = open_boutdataset(datapath=path, keep_xboundaries=False)
bout_ds = create_bout_ds
expected = concat(
[bout_ds(0), bout_ds(1), bout_ds(2)], dim="y", data_vars="minimal"
)
expected = expected.set_coords(["t_array", "dx", "dy", "dz"]).rename(
t_array="t"
)
xrt.assert_equal(
actual.drop_vars(["x", "y", "z"]).load(),
expected.drop_vars(
METADATA_VARS + _BOUT_PER_PROC_VARIABLES, errors="ignore"
),
)
# check creation without writing to disk gives identical result
fake_ds_list = bout_xyt_example_files(
None, nxpe=1, nype=3, nt=1, syn_data_type="stepped"
)
with pytest.warns(UserWarning):
fake = open_boutdataset(datapath=fake_ds_list, keep_xboundaries=False)
xrt.assert_identical(actual, fake)
[docs]
@pytest.mark.skip
def test_combine_along_t(self):
...
[docs]
@pytest.mark.parametrize(
"bout_v5,metric_3D", [(False, False), (True, False), (True, True)]
)
@pytest.mark.parametrize("lengths", [(6, 2, 4, 7), (6, 2, 4, 1)])
def test_combine_along_xy(
self, tmp_path_factory, bout_xyt_example_files, bout_v5, metric_3D, lengths
):
path = bout_xyt_example_files(
tmp_path_factory,
nxpe=4,
nype=3,
nt=1,
lengths=lengths,
syn_data_type="stepped",
write_to_disk=True,
bout_v5=bout_v5,
metric_3D=metric_3D,
)
with pytest.warns(UserWarning):
actual = open_boutdataset(datapath=path, keep_xboundaries=False)
def bout_ds(syn_data_type):
return create_bout_ds(
syn_data_type, bout_v5=bout_v5, metric_3D=metric_3D, lengths=lengths
)
line1 = concat(
[bout_ds(0), bout_ds(1), bout_ds(2), bout_ds(3)],
dim="x",
data_vars="minimal",
)
line2 = concat(
[bout_ds(4), bout_ds(5), bout_ds(6), bout_ds(7)],
dim="x",
data_vars="minimal",
)
line3 = concat(
[bout_ds(8), bout_ds(9), bout_ds(10), bout_ds(11)],
dim="x",
data_vars="minimal",
)
expected = concat([line1, line2, line3], dim="y", data_vars="minimal")
expected = expected.set_coords(["t_array", "dx", "dy", "dz"]).rename(
t_array="t"
)
vars_to_drop = METADATA_VARS + _BOUT_PER_PROC_VARIABLES
xrt.assert_equal(
actual.drop_vars(["x", "y", "z"]).load(),
expected.drop_vars(vars_to_drop, errors="ignore"),
)
# check creation without writing to disk gives identical result
fake_ds_list = bout_xyt_example_files(
None,
nxpe=4,
nype=3,
nt=1,
lengths=lengths,
syn_data_type="stepped",
bout_v5=bout_v5,
metric_3D=metric_3D,
)
with pytest.warns(UserWarning):
fake = open_boutdataset(datapath=fake_ds_list, keep_xboundaries=False)
xrt.assert_identical(actual, fake)
[docs]
def test_toroidal(self, tmp_path_factory, bout_xyt_example_files):
# actually write these to disk to test the loading fully
path = bout_xyt_example_files(
tmp_path_factory,
nxpe=3,
nype=3,
nt=1,
syn_data_type="stepped",
grid="grid",
write_to_disk=True,
)
actual = open_boutdataset(
datapath=path,
geometry="toroidal",
gridfilepath=path.parent.joinpath("grid.nc"),
)
# check dataset can be saved
save_dir = tmp_path_factory.mktemp("data")
actual.bout.save(save_dir.joinpath("boutdata.nc"))
# check creation without writing to disk gives identical result
fake_ds_list, fake_grid_ds = bout_xyt_example_files(
None,
nxpe=3,
nype=3,
nt=1,
syn_data_type="stepped",
grid="grid",
)
fake = open_boutdataset(
datapath=fake_ds_list, geometry="toroidal", gridfilepath=fake_grid_ds
)
xrt.assert_identical(actual, fake)
[docs]
def test_salpha(self, tmp_path_factory, bout_xyt_example_files):
path = bout_xyt_example_files(
tmp_path_factory,
nxpe=3,
nype=3,
nt=1,
syn_data_type="stepped",
grid="grid",
write_to_disk=True,
)
actual = open_boutdataset(
datapath=path,
geometry="s-alpha",
gridfilepath=path.parent.joinpath("grid.nc"),
)
# check dataset can be saved
save_dir = tmp_path_factory.mktemp("data")
actual.bout.save(save_dir.joinpath("boutdata.nc"))
# check creation without writing to disk gives identical result
fake_ds_list, fake_grid_ds = bout_xyt_example_files(
None, nxpe=3, nype=3, nt=1, syn_data_type="stepped", grid="grid"
)
fake = open_boutdataset(
datapath=fake_ds_list, geometry="s-alpha", gridfilepath=fake_grid_ds
)
xrt.assert_identical(actual, fake)
[docs]
def test_drop_vars(self, tmp_path_factory, bout_xyt_example_files):
datapath = bout_xyt_example_files(
tmp_path_factory,
nxpe=4,
nype=1,
nt=1,
syn_data_type="stepped",
write_to_disk=True,
)
with pytest.warns(UserWarning):
ds = open_boutdataset(
datapath=datapath, keep_xboundaries=False, drop_variables=["T"]
)
assert "T" not in ds.keys()
assert "n" in ds.keys()
[docs]
@pytest.mark.skip
def test_combine_along_tx(self):
...
[docs]
def test_restarts(self):
datapath = Path(__file__).parent.joinpath(
"data", "restart", "BOUT.restart.*.nc"
)
ds = open_boutdataset(datapath, keep_xboundaries=True, keep_yboundaries=True)
assert "T" in ds
_test_processor_layouts_list = [
# No parallelization
(0, 0, 1, 1, {"x": True, "y": True}, {"x": True, "y": True}),
# 1d parallelization along x:
# Left
(0, 0, 3, 1, {"x": True, "y": True}, {"x": False, "y": True}),
# Middle
(1, 0, 3, 1, {"x": False, "y": True}, {"x": False, "y": True}),
# Right
(2, 0, 3, 1, {"x": False, "y": True}, {"x": True, "y": True}),
# 1d parallelization along y:
# Bottom
(0, 0, 1, 3, {"x": True, "y": True}, {"x": True, "y": False}),
# Middle
(0, 1, 1, 3, {"x": True, "y": False}, {"x": True, "y": False}),
# Top
(0, 2, 1, 3, {"x": True, "y": False}, {"x": True, "y": True}),
# 2d parallelization:
# Bottom left corner
(0, 0, 3, 4, {"x": True, "y": True}, {"x": False, "y": False}),
# Bottom right corner
(2, 0, 3, 4, {"x": False, "y": True}, {"x": True, "y": False}),
# Top left corner
(0, 3, 3, 4, {"x": True, "y": False}, {"x": False, "y": True}),
# Top right corner
(2, 3, 3, 4, {"x": False, "y": False}, {"x": True, "y": True}),
# Centre
(1, 2, 3, 4, {"x": False, "y": False}, {"x": False, "y": False}),
# Left side
(0, 2, 3, 4, {"x": True, "y": False}, {"x": False, "y": False}),
# Right side
(2, 2, 3, 4, {"x": False, "y": False}, {"x": True, "y": False}),
# Bottom side
(1, 0, 3, 4, {"x": False, "y": True}, {"x": False, "y": False}),
# Top side
(1, 3, 3, 4, {"x": False, "y": False}, {"x": False, "y": True}),
]
_test_processor_layouts_doublenull_list = [
# 1d parallelization along y:
# Bottom
(0, 0, 1, 4, {"x": True, "y": True}, {"x": True, "y": False}),
# Lower Middle
(0, 1, 1, 4, {"x": True, "y": False}, {"x": True, "y": True}),
# Upper Middle
(0, 2, 1, 4, {"x": True, "y": True}, {"x": True, "y": False}),
# Top
(0, 3, 1, 4, {"x": True, "y": False}, {"x": True, "y": True}),
# 2d parallelization:
# Bottom left corner
(0, 0, 3, 4, {"x": True, "y": True}, {"x": False, "y": False}),
(1, 0, 3, 4, {"x": False, "y": True}, {"x": False, "y": False}),
# Bottom right corner
(2, 0, 3, 4, {"x": False, "y": True}, {"x": True, "y": False}),
(0, 1, 3, 4, {"x": True, "y": False}, {"x": False, "y": True}),
(1, 1, 3, 4, {"x": False, "y": False}, {"x": False, "y": True}),
(2, 1, 3, 4, {"x": False, "y": False}, {"x": True, "y": True}),
(0, 2, 3, 4, {"x": True, "y": True}, {"x": False, "y": False}),
(1, 2, 3, 4, {"x": False, "y": True}, {"x": False, "y": False}),
(2, 2, 3, 4, {"x": False, "y": True}, {"x": True, "y": False}),
# Top left corner
(0, 3, 3, 4, {"x": True, "y": False}, {"x": False, "y": True}),
(1, 3, 3, 4, {"x": False, "y": False}, {"x": False, "y": True}),
# Top right corner
(2, 3, 3, 4, {"x": False, "y": False}, {"x": True, "y": True}),
]
[docs]
class TestTrim:
[docs]
@pytest.mark.parametrize("is_restart", [False, True])
def test_no_trim(self, is_restart):
ds = create_test_data(0)
# Manually add filename - encoding normally added by xr.open_dataset
ds.encoding["source"] = "folder0/BOUT.dmp.0.nc"
actual = _trim(
ds, guards={}, keep_boundaries={}, nxpe=1, nype=1, is_restart=is_restart
)
xrt.assert_equal(actual, ds)
[docs]
def test_trim_guards(self):
ds = create_test_data(0)
# Manually add filename - encoding normally added by xr.open_dataset
ds.encoding["source"] = "folder0/BOUT.dmp.0.nc"
actual = _trim(
ds, guards={"time": 2}, keep_boundaries={}, nxpe=1, nype=1, is_restart=False
)
selection = {"time": slice(2, -2)}
expected = ds.isel(**selection)
xrt.assert_equal(expected, actual)
[docs]
@pytest.mark.parametrize(
"xproc, yproc, nxpe, nype, lower_boundaries, upper_boundaries",
_test_processor_layouts_list,
)
def test_infer_boundaries_2d_parallelization(
self, xproc, yproc, nxpe, nype, lower_boundaries, upper_boundaries
):
"""
Numbering scheme for nxpe=3, nype=4
.. code:: text
y 9 10 11
^ 6 7 8
| 3 4 5
| 0 1 2
-----> x
"""
ds = create_test_data(0)
ds["jyseps2_1"] = 0
ds["jyseps1_2"] = 0
ds["PE_XIND"] = xproc
ds["PE_YIND"] = yproc
actual_lower_boundaries, actual_upper_boundaries = _infer_contains_boundaries(
ds, nxpe, nype
)
assert actual_lower_boundaries == lower_boundaries
assert actual_upper_boundaries == upper_boundaries
[docs]
@pytest.mark.parametrize(
"xproc, yproc, nxpe, nype, lower_boundaries, upper_boundaries",
_test_processor_layouts_doublenull_list,
)
def test_infer_boundaries_2d_parallelization_doublenull(
self, xproc, yproc, nxpe, nype, lower_boundaries, upper_boundaries
):
"""
Numbering scheme for nxpe=3, nype=4
.. code:: text
y 9 10 11
^ 6 7 8
| 3 4 5
| 0 1 2
-----> x
"""
ds = create_test_data(0)
ds["jyseps2_1"] = 3
ds["jyseps1_2"] = 11
ds["ny_inner"] = 8
ds["MYSUB"] = 4
ds["PE_XIND"] = xproc
ds["PE_YIND"] = yproc
actual_lower_boundaries, actual_upper_boundaries = _infer_contains_boundaries(
ds, nxpe, nype
)
assert actual_lower_boundaries == lower_boundaries
assert actual_upper_boundaries == upper_boundaries
[docs]
@pytest.mark.parametrize(
"xproc, yproc, nxpe, nype, lower_boundaries, upper_boundaries",
_test_processor_layouts_list,
)
def test_infer_boundaries_2d_parallelization_by_filenum(
self, xproc, yproc, nxpe, nype, lower_boundaries, upper_boundaries
):
"""
Numbering scheme for nxpe=3, nype=4
.. code:: text
y 9 10 11
^ 6 7 8
| 3 4 5
| 0 1 2
-----> x
"""
filenum = yproc * nxpe + xproc
ds = create_test_data(0)
ds["jyseps2_1"] = 0
ds["jyseps1_2"] = 0
ds.encoding["source"] = "folder0/BOUT.dmp." + str(filenum) + ".nc"
actual_lower_boundaries, actual_upper_boundaries = _infer_contains_boundaries(
ds, nxpe, nype
)
assert actual_lower_boundaries == lower_boundaries
assert actual_upper_boundaries == upper_boundaries
[docs]
@pytest.mark.parametrize(
"xproc, yproc, nxpe, nype, lower_boundaries, upper_boundaries",
_test_processor_layouts_doublenull_list,
)
def test_infer_boundaries_2d_parallelization_doublenull_by_filenum(
self, xproc, yproc, nxpe, nype, lower_boundaries, upper_boundaries
):
"""
Numbering scheme for nxpe=3, nype=4
.. code:: text
y 9 10 11
^ 6 7 8
| 3 4 5
| 0 1 2
-----> x
"""
filenum = yproc * nxpe + xproc
ds = create_test_data(0)
ds["jyseps2_1"] = 3
ds["jyseps1_2"] = 11
ds["ny_inner"] = 8
ds["MYSUB"] = 4
ds.encoding["source"] = "folder0/BOUT.dmp." + str(filenum) + ".nc"
actual_lower_boundaries, actual_upper_boundaries = _infer_contains_boundaries(
ds, nxpe, nype
)
assert actual_lower_boundaries == lower_boundaries
assert actual_upper_boundaries == upper_boundaries
[docs]
@pytest.mark.parametrize("is_restart", [False, True])
def test_keep_xboundaries(self, is_restart):
ds = create_test_data(0)
ds = ds.rename({"dim2": "x"})
# Manually add filename - encoding normally added by xr.open_dataset
ds.encoding["source"] = "folder0/BOUT.dmp.0.nc"
ds["jyseps2_1"] = 8
ds["jyseps1_2"] = 8
actual = _trim(
ds,
guards={"x": 2},
keep_boundaries={"x": True},
nxpe=1,
nype=1,
is_restart=is_restart,
)
expected = ds # Should be unchanged
xrt.assert_equal(expected, actual)
[docs]
@pytest.mark.parametrize("is_restart", [False, True])
def test_keep_yboundaries(self, is_restart):
ds = create_test_data(0)
ds = ds.rename({"dim2": "y"})
# Manually add filename - encoding normally added by xr.open_dataset
ds.encoding["source"] = "folder0/BOUT.dmp.0.nc"
ds["jyseps2_1"] = 8
ds["jyseps1_2"] = 8
actual = _trim(
ds,
guards={"y": 2},
keep_boundaries={"y": True},
nxpe=1,
nype=1,
is_restart=is_restart,
)
expected = ds # Should be unchanged
xrt.assert_equal(expected, actual)
[docs]
@pytest.mark.parametrize(
"filenum, lower, upper",
[(0, True, False), (1, False, True), (2, True, False), (3, False, True)],
)
@pytest.mark.parametrize("is_restart", [False, True])
def test_keep_yboundaries_doublenull_by_filenum(
self, filenum, lower, upper, is_restart
):
ds = create_test_data(0)
ds = ds.rename({"dim2": "y"})
# Manually add filename - encoding normally added by xr.open_dataset
ds.encoding["source"] = "folder0/BOUT.dmp." + str(filenum) + ".nc"
ds["jyseps2_1"] = 3
ds["jyseps1_2"] = 11
ds["ny_inner"] = 8
ds["MYSUB"] = 4
actual = _trim(
ds,
guards={"y": 2},
keep_boundaries={"y": True},
nxpe=1,
nype=4,
is_restart=is_restart,
)
expected = ds # Should be unchanged
if not lower:
expected = expected.isel(y=slice(2, None, None))
if not upper:
expected = expected.isel(y=slice(None, -2, None))
xrt.assert_equal(expected, actual)
[docs]
@pytest.mark.parametrize("is_restart", [False, True])
def test_trim_timing_info(self, is_restart):
ds = create_test_data(0)
from xbout.load import _BOUT_PER_PROC_VARIABLES
# remove a couple of entries from _BOUT_PER_PROC_VARIABLES so we test that _trim
# does not fail if not all of them are present
_BOUT_PER_PROC_VARIABLES = _BOUT_PER_PROC_VARIABLES[:-2]
for v in _BOUT_PER_PROC_VARIABLES:
ds[v] = 42.0
ds = _trim(
ds, guards={}, keep_boundaries={}, nxpe=1, nype=1, is_restart=is_restart
)
expected = create_test_data(0)
xrt.assert_equal(ds, expected)