diff --git a/news/squeeze_warnings.rst b/news/squeeze_warnings.rst new file mode 100644 index 0000000..3c89e8a --- /dev/null +++ b/news/squeeze_warnings.rst @@ -0,0 +1,23 @@ +**Added:** + +* Warnings added to `squeeze` morph if the squeeze causes the grid to become non-monotonic. + +**Changed:** + +* + +**Deprecated:** + +* + +**Removed:** + +* + +**Fixed:** + +* + +**Security:** + +* diff --git a/src/diffpy/morph/morph_io.py b/src/diffpy/morph/morph_io.py index 84d5c15..613bd3a 100644 --- a/src/diffpy/morph/morph_io.py +++ b/src/diffpy/morph/morph_io.py @@ -408,9 +408,9 @@ def tabulate_results(multiple_morph_results): return tabulated_results -def handle_warnings(squeeze_morph): - if squeeze_morph is not None: - extrapolation_info = squeeze_morph.extrapolation_info +def handle_extrapolation_warnings(morph): + if morph is not None: + extrapolation_info = morph.extrapolation_info is_extrap_low = extrapolation_info["is_extrap_low"] is_extrap_high = extrapolation_info["is_extrap_high"] cutoff_low = extrapolation_info["cutoff_low"] @@ -443,3 +443,35 @@ def handle_warnings(squeeze_morph): wmsg, UserWarning, ) + + +def handle_check_increase_warning(squeeze_morph): + if squeeze_morph is not None: + if not squeeze_morph.strictly_increasing: + wmsg = ( + "Warning: The squeeze morph has interpolated your morphed " + "function from a non-monotonically increasing grid. " + "\nThis may not be an issue, but please check for your " + "particular case. " + "\nTo avoid squeeze making your grid non-monotonic, " + "here are some suggested fixes: " + "\n(1) Please decrease the order of your polynomial and " + "try again. " + "\n(2) If you are using initial guesses of all 0, please " + "ensure your objective function only requires a small " + "polynomial squeeze to match your reference. " + "(In other words, there is good agreement between the two " + "functions.) " + "\n(3) If you expect a large polynomial squeeze to be " + "needed, please ensure your initial parameters for the " + "polynomial morph result in good agreement between your " + "reference and objective functions. " + "One way to obtain such parameters is to " + "first apply a --hshift and --stretch morph. " + "Then, use the hshift parameter for a0 and stretch " + "parameter for a1." + ) + warnings.warn( + wmsg, + UserWarning, + ) diff --git a/src/diffpy/morph/morphapp.py b/src/diffpy/morph/morphapp.py index 327ed2d..e96aa45 100755 --- a/src/diffpy/morph/morphapp.py +++ b/src/diffpy/morph/morphapp.py @@ -707,9 +707,10 @@ def single_morph( chain(x_morph, y_morph, x_target, y_target) # THROW ANY WARNINGS HERE - io.handle_warnings(squeeze_morph) - io.handle_warnings(shift_morph) - io.handle_warnings(stretch_morph) + io.handle_extrapolation_warnings(squeeze_morph) + io.handle_check_increase_warning(squeeze_morph) + io.handle_extrapolation_warnings(shift_morph) + io.handle_extrapolation_warnings(stretch_morph) # Get Rw for the morph range rw = tools.getRw(chain) diff --git a/src/diffpy/morph/morphs/morphsqueeze.py b/src/diffpy/morph/morphs/morphsqueeze.py index 3d0250d..5909d65 100644 --- a/src/diffpy/morph/morphs/morphsqueeze.py +++ b/src/diffpy/morph/morphs/morphsqueeze.py @@ -1,6 +1,7 @@ """Class MorphSqueeze -- Apply a polynomial to squeeze the morph function.""" +import numpy from numpy.polynomial import Polynomial from scipy.interpolate import CubicSpline @@ -67,10 +68,36 @@ class MorphSqueeze(Morph): extrap_index_high = None squeeze_cutoff_low = None squeeze_cutoff_high = None + strictly_increasing = None def __init__(self, config=None): super().__init__(config) + def _check_strictly_increasing(self, x, x_sorted): + if list(x) != list(x_sorted): + self.strictly_increasing = False + else: + self.strictly_increasing = True + + def _sort_squeeze(self, x, y): + """Sort x,y according to the value of x.""" + xy = list(zip(x, y)) + xy_sorted = sorted(xy, key=lambda pair: pair[0]) + x_sorted, y_sorted = numpy.array(list(zip(*xy_sorted))) + return x_sorted, y_sorted + + def _handle_duplicates(self, x, y): + """Remove duplicated x and use the mean value of y corresponded + to the duplicated x.""" + x_unique, inv = numpy.unique(x, return_inverse=True) + if len(x_unique) == len(x): + return x, y + else: + y_avg = numpy.zeros_like(x_unique, dtype=float) + for idx, _ in enumerate(x_unique): + y_avg[idx] = y[inv == idx].mean() + return x_unique, y_avg + def morph(self, x_morph, y_morph, x_target, y_target): """Apply a polynomial to squeeze the morph function. @@ -82,9 +109,16 @@ def morph(self, x_morph, y_morph, x_target, y_target): coeffs = [self.squeeze[f"a{i}"] for i in range(len(self.squeeze))] squeeze_polynomial = Polynomial(coeffs) x_squeezed = self.x_morph_in + squeeze_polynomial(self.x_morph_in) - self.y_morph_out = CubicSpline(x_squeezed, self.y_morph_in)( + x_squeezed_sorted, y_morph_sorted = self._sort_squeeze( + x_squeezed, self.y_morph_in + ) + self._check_strictly_increasing(x_squeezed, x_squeezed_sorted) + x_squeezed_sorted, y_morph_sorted = self._handle_duplicates( + x_squeezed_sorted, y_morph_sorted + ) + self.y_morph_out = CubicSpline(x_squeezed_sorted, y_morph_sorted)( self.x_morph_in ) - self.set_extrapolation_info(x_squeezed, self.x_morph_in) + self.set_extrapolation_info(x_squeezed_sorted, self.x_morph_in) return self.xyallout diff --git a/tests/test_morphsqueeze.py b/tests/test_morphsqueeze.py index 07b9937..2bb65b0 100644 --- a/tests/test_morphsqueeze.py +++ b/tests/test_morphsqueeze.py @@ -54,6 +54,8 @@ def test_morphsqueeze(x_morph, x_target, squeeze_coeffs): x_target_expected = x_target y_target_expected = y_target # actual output + # turn the coefficients into a list for passing to Polynomial + # the morphsqueeze function itself requires a dictionary coeffs = [squeeze_coeffs[f"a{i}"] for i in range(len(squeeze_coeffs))] squeeze_polynomial = Polynomial(coeffs) x_squeezed = x_morph + squeeze_polynomial(x_morph) @@ -139,16 +141,16 @@ def test_morphsqueeze_extrapolate(user_filesystem, squeeze_coeffs, wmsg_gen): coeffs = [squeeze_coeffs[f"a{i}"] for i in range(len(squeeze_coeffs))] squeeze_polynomial = Polynomial(coeffs) x_squeezed = x_morph + squeeze_polynomial(x_morph) - with pytest.warns() as w: + with pytest.warns() as warning: morphpy.morph_arrays( np.array([x_morph, y_morph]).T, np.array([x_target, y_target]).T, squeeze=coeffs, apply=True, ) - assert len(w) == 1 - assert w[0].category is UserWarning - actual_wmsg = str(w[0].message) + assert len(warning) == 1 + assert warning[0].category is UserWarning + actual_wmsg = str(warning[0].message) expected_wmsg = wmsg_gen([min(x_squeezed), max(x_squeezed)]) assert actual_wmsg == expected_wmsg @@ -170,3 +172,134 @@ def test_morphsqueeze_extrapolate(user_filesystem, squeeze_coeffs, wmsg_gen): ) with pytest.warns(UserWarning, match=expected_wmsg): single_morph(parser, opts, pargs, stdout_flag=False) + + +def test_non_unique_grid(): + # Test giving morphsqueeze a non-unique grid + # Expect it to return a unique grid + squeeze_coeffs = {"a0": 0.01, "a1": 0.01, "a2": -0.1} + x_grid = np.linspace(0, 10, 101) + + coeffs = [squeeze_coeffs[f"a{i}"] for i in range(len(squeeze_coeffs))] + squeeze_polynomial = Polynomial(coeffs) + x_morph = x_grid + squeeze_polynomial(x_grid) + x_gradient = np.diff(x_morph) + x_gradient_sign = np.sign(x_gradient) + # non-strictly increasing means the gradient becomes 0 or negative + assert not np.all(x_gradient_sign > 0) + + x_target = np.linspace(min(x_morph), max(x_morph), len(x_morph)) + y_target = np.sin(x_target) + + y_morph = np.sin(x_morph) + # apply no squeeze, but the morph should sort the function + _, table = morphpy.morph_arrays( + np.array([x_morph, y_morph]).T, + np.array([x_target, y_target]).T, + squeeze=[0, 0, 0], + apply=True, + ) + x_refined, _ = table[:, 0], table[:, 1] + + # grid should be properly sorted + assert np.allclose(x_refined, x_target) + # note that the function itself may be distorted + + +@pytest.mark.parametrize( + "squeeze_coeffs, x_morph", + [ + # The following squeezes make the function non-monotonic. + # Expect code to work but issue the correct warning. + ([-1, -1, 2], np.linspace(-1, 1, 101)), + ( + [-1, -1, 0, 0, 2], + np.linspace(-1, 1, 101), + ), + ], +) +def test_squeeze_warnings(user_filesystem, squeeze_coeffs, x_morph): + # call in .py + x_target = x_morph + y_target = np.sin(x_target) + squeeze_polynomial = Polynomial(squeeze_coeffs) + x_squeezed = x_morph + squeeze_polynomial(x_morph) + y_morph = np.sin(x_squeezed) + morph = MorphSqueeze() + morph.squeeze = squeeze_coeffs + with pytest.warns() as warning: + morphpy.morph_arrays( + np.array([x_morph, y_morph]).T, + np.array([x_target, y_target]).T, + squeeze=squeeze_coeffs, + apply=True, + ) + assert len(warning) == 1 + assert warning[0].category is UserWarning + actual_wmsg = str(warning[0].message) + expected_wmsg = ( + "Warning: The squeeze morph has interpolated your morphed " + "function from a non-monotonically increasing grid. " + "\nThis may not be an issue, but please check for your " + "particular case. " + "\nTo avoid squeeze making your grid non-monotonic, " + "here are some suggested fixes: " + "\n(1) Please decrease the order of your polynomial and try again. " + "\n(2) If you are using initial guesses of all 0, please ensure " + "your objective function only requires a small polynomial " + "squeeze to match your reference. " + "(In other words, there is good agreement between the two " + "functions.) " + "\n(3) If you expect a large polynomial squeeze to be needed, " + "please ensure your initial parameters for the polynomial " + "morph result in good agreement between your reference and " + "objective functions. One way to obtain such parameters is to " + "first apply a --hshift and --stretch morph. " + "Then, use the hshift parameter for a0 and stretch parameter for a1." + ) + assert expected_wmsg in actual_wmsg + + # call in CLI + morph_file, target_file = create_morph_data_file( + user_filesystem / "cwd_dir", x_morph, y_morph, x_target, y_target + ) + parser = create_option_parser() + (opts, pargs) = parser.parse_args( + [ + "--squeeze", + ",".join(map(str, squeeze_coeffs)), + f"{morph_file.as_posix()}", + f"{target_file.as_posix()}", + "--apply", + "-n", + ] + ) + with pytest.warns(UserWarning) as warning: + single_morph(parser, opts, pargs, stdout_flag=False) + assert len(warning) == 1 + actual_wmsg = str(warning[0].message) + assert expected_wmsg in actual_wmsg + + +@pytest.mark.parametrize( + "x_sampled", + [ + # Expected output: all repeated datapoints are removed + # Test one duplicate per number + np.array([0, 0, 1, 1, 2, 2, 3, 3]), + # Test more than one duplicates per number + np.array([0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 2]), + # Test with only one grid number + np.array([0, 0, 0, 0]), + # Test no duplicates + np.array([0, 1, 2, 3, 4]), + ], +) +def test_handle_duplicates(x_sampled): + morph = MorphSqueeze() + y_sampled = np.sin(x_sampled) + x_handled, y_handled = morph._handle_duplicates(x_sampled, y_sampled) + x_target = np.unique(x_sampled) + y_target = np.array([y_sampled[x_sampled == x].mean() for x in x_target]) + assert np.allclose(x_handled, x_target) + assert np.allclose(y_handled, y_target)