Coverage for physped/utils/functions.py: 44%
54 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
1import logging
2from functools import reduce
3from typing import Any, Callable, Tuple
5import numpy as np
7log = logging.getLogger(__name__)
9Composable = Callable[[Any], Any]
12def compose_functions(*functions: Composable) -> Composable:
13 return lambda x, **kwargs: reduce(
14 lambda df, fn: fn(df, **kwargs), functions, x
15 )
18def cartesian_to_polar_coordinates(x: float, y: float) -> tuple:
19 """Convert cartesian coordinates to polar coordinates.
21 Parameters:
22 - x: The x-coordinate.
23 - y: The y-coordinate.
25 Returns:
26 - A tuple with the polar coordinates
27 """
28 rho = np.sqrt(x**2 + y**2)
29 phi = np.arctan2(y, x)
30 return (rho, phi)
33def polar_to_cartesian_coordinates(rho: float, phi: float) -> tuple:
34 """Convert polar coordinates to cartesian coordinates.
36 Parameters:
37 rho: The radial distance from the origin to the point.
38 phi: The angle in radians.
40 Returns:
41 - A tuple containing the cartesian coordinate.s
42 """
43 x = rho * np.cos(phi)
44 y = rho * np.sin(phi)
45 return (x, y)
48def get_slice_of_multidimensional_matrix(
49 a: np.ndarray,
50 slice_x: Tuple,
51 slice_y: Tuple,
52 slice_theta: Tuple,
53 slice_r: Tuple,
54) -> np.ndarray:
55 """
56 Get a slice of a multidimensional matrix.
58 Parameters:
59 a (ndarray): The input multidimensional matrix.
60 slice_x (tuple): The range of indices to slice along the x-axis.
61 slice_y (tuple): The range of indices to slice along the y-axis.
62 slice_theta (tuple): The range of indices to slice along the theta-axis.
63 slice_r (tuple): The range of indices to slice along the r-axis.
65 Returns:
66 ndarray: The sliced multidimensional matrix.
68 Raises:
69 ValueError: If the slice values are not in ascending order.
71 """
72 for slice_range in [slice_x, slice_y, slice_theta, slice_r]:
73 if slice_range[0] > slice_range[1]:
74 raise ValueError("Slice values must be in ascending order.")
75 sl0 = (
76 np.array(range(slice_x[0], slice_x[1])).reshape(-1, 1, 1, 1)
77 % a.shape[0]
78 )
79 sl1 = (
80 np.array(range(slice_y[0], slice_y[1])).reshape(1, -1, 1, 1)
81 % a.shape[1]
82 )
83 sl2 = (
84 np.array(range(slice_theta[0], slice_theta[1])).reshape(1, 1, -1, 1)
85 % a.shape[2]
86 )
87 sl3 = (
88 np.array(range(slice_r[0], slice_r[1])).reshape(1, 1, 1, -1)
89 % a.shape[3]
90 )
91 return a[sl0, sl1, sl2, sl3]
94def get_bin_middle(bins: np.ndarray) -> np.ndarray:
95 """Return the middle of the input bins.
97 Args:
98 bins: The input bins.
100 Returns:
101 The middle of the input bins.
103 """
104 return (bins[1:] + bins[:-1]) / 2
107def weighted_mean_of_two_matrices(
108 first_matrix: np.ndarray,
109 counts_first_matrix: np.ndarray,
110 second_matrix: np.ndarray,
111 counts_second_matrix: np.ndarray,
112) -> np.ndarray:
113 """
114 Calculates the weighted mean of two matrices.
116 Args:
117 first_matrix (numpy.ndarray): First input matrix.
118 counts_first_matrix (numpy.ndarray): Counts for the first input matrix.
119 second_matrix (numpy.ndarray): Second input matrix.
120 counts_second_matrix (numpy.ndarray): Counts for the second input
121 matrix.
123 Returns:
124 numpy.ndarray: Weighted mean of the two input matrices.
125 """
126 vals_stack = np.stack([first_matrix, second_matrix], axis=0)
127 counts_stack = np.stack(
128 [counts_first_matrix, counts_second_matrix], axis=0
129 )
130 counts_sum = np.nansum(counts_stack, axis=0)
131 counts_sum_stack = np.stack([counts_sum] * 2, axis=0)
132 weights = np.true_divide(
133 counts_stack,
134 counts_sum_stack,
135 where=(counts_stack != 0) | (counts_sum_stack != 0),
136 )
137 weighted_vals = np.multiply(vals_stack, weights)
138 weighted_mean = np.nansum(weighted_vals, axis=0)
140 # If both values are nan, return nan
141 both_nan = (
142 np.sum(
143 np.stack(
144 [np.isnan(first_matrix), np.isnan(second_matrix)], axis=0
145 ),
146 axis=0,
147 )
148 == 2
149 )
150 weighted_mean = np.where(both_nan, np.nan, weighted_mean)
151 return weighted_mean
154def test_weighted_mean_of_two_matrices(
155 first_matrix: np.ndarray, counts_first_matrix: np.ndarray
156) -> None:
157 """
158 Test function for weighted_mean_of_two_matrices.
160 This function tests whether the weighted average of two matrices is equal
161 to the input values.
163 Parameters:
164 first_matrix (numpy.ndarray): The first input matrix.
165 counts_first_matrix (numpy.ndarray): The counts matrix corresponding to
166 the first input matrix.
167 """
168 # ! Move this ot tests
169 out = weighted_mean_of_two_matrices(
170 first_matrix, counts_first_matrix, first_matrix, counts_first_matrix
171 )
172 assert np.array_equal(out, first_matrix, equal_nan=True)
173 print("Test passed.")
176# def sample_nd_histogram(origin_histogram: np.ndarray, sample_count: int = 1)
177# -> np.ndarray:
178# """
179# Sample origin positions from heatmap with initial position.
181# Parameters:
182# origin_histogram (ndarray): The heatmap with initial position.
183# sample_count (int): The number of samples to generate. Default is 1.
185# Returns:
186# ndarray: The sampled origin positions.
188# """
189# flat_origin_histogram = origin_histogram.flatten()
190# flat_indices = range(len(flat_origin_histogram))
191# indices1d = np.random.choice(
192# a=flat_indices,
193# size=sample_count,
194# replace=True,
195# p=flat_origin_histogram / np.sum(flat_origin_histogram),
196# )
198# return np.unravel_index(indices1d, origin_histogram.shape)
201def periodic_angular_conditions(
202 angle: np.ndarray, angular_bins: np.ndarray
203) -> np.ndarray:
204 """Apply periodic boundary conditions to a list of angular coordinates.
206 Args:
207 angle: A list of angular coordinates.
208 angular_bins: An array of bin edges defining the angular grid cells.
210 Returns:
211 The angular coordinates cast to the angular grid.
212 """
213 angle -= angular_bins[0]
214 angle = angle % (2 * np.pi)
215 angle += angular_bins[0]
216 return angle
219def weighted_mean_of_matrix(
220 field: np.ndarray, histogram: np.ndarray, axes: Tuple = (2, 3, 4)
221) -> np.ndarray:
222 """
223 Calculate the weighted mean of a matrix based on a given histogram.
225 Parameters:
226 field (np.ndarray): The input matrix.
227 histogram (np.ndarray): The histogram used for weighting.
228 axes (Tuple): The axes along which to calculate the mean.
229 Default is (2, 3, 4).
231 Returns:
232 np.ndarray: The weighted mean of the matrix.
234 """
235 weights = histogram / np.sum(histogram)
236 values = field
238 weighted_sum = np.sum(values * weights, axis=axes)
239 sum_of_weights = np.sum(weights, axis=axes)
241 weighted_average = weighted_sum / sum_of_weights
243 # weighted_field = np.nansum(field * histogram, axis=axes)
244 # position_histogram = np.nansum(histogram, axis=axes)
245 # weighted_field /= np.where(position_histogram != 0, position_histogram,
246 # np.inf)
247 return weighted_average