Coverage for physped/utils/functions.py: 44%

54 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-01 09:28 +0000

1import logging 

2from functools import reduce 

3from typing import Any, Callable, Tuple 

4 

5import numpy as np 

6 

7log = logging.getLogger(__name__) 

8 

9Composable = Callable[[Any], Any] 

10 

11 

12def compose_functions(*functions: Composable) -> Composable: 

13 return lambda x, **kwargs: reduce( 

14 lambda df, fn: fn(df, **kwargs), functions, x 

15 ) 

16 

17 

18def cartesian_to_polar_coordinates(x: float, y: float) -> tuple: 

19 """Convert cartesian coordinates to polar coordinates. 

20 

21 Parameters: 

22 - x: The x-coordinate. 

23 - y: The y-coordinate. 

24 

25 Returns: 

26 - A tuple with the polar coordinates 

27 """ 

28 rho = np.sqrt(x**2 + y**2) 

29 phi = np.arctan2(y, x) 

30 return (rho, phi) 

31 

32 

33def polar_to_cartesian_coordinates(rho: float, phi: float) -> tuple: 

34 """Convert polar coordinates to cartesian coordinates. 

35 

36 Parameters: 

37 rho: The radial distance from the origin to the point. 

38 phi: The angle in radians. 

39 

40 Returns: 

41 - A tuple containing the cartesian coordinate.s 

42 """ 

43 x = rho * np.cos(phi) 

44 y = rho * np.sin(phi) 

45 return (x, y) 

46 

47 

48def get_slice_of_multidimensional_matrix( 

49 a: np.ndarray, 

50 slice_x: Tuple, 

51 slice_y: Tuple, 

52 slice_theta: Tuple, 

53 slice_r: Tuple, 

54) -> np.ndarray: 

55 """ 

56 Get a slice of a multidimensional matrix. 

57 

58 Parameters: 

59 a (ndarray): The input multidimensional matrix. 

60 slice_x (tuple): The range of indices to slice along the x-axis. 

61 slice_y (tuple): The range of indices to slice along the y-axis. 

62 slice_theta (tuple): The range of indices to slice along the theta-axis. 

63 slice_r (tuple): The range of indices to slice along the r-axis. 

64 

65 Returns: 

66 ndarray: The sliced multidimensional matrix. 

67 

68 Raises: 

69 ValueError: If the slice values are not in ascending order. 

70 

71 """ 

72 for slice_range in [slice_x, slice_y, slice_theta, slice_r]: 

73 if slice_range[0] > slice_range[1]: 

74 raise ValueError("Slice values must be in ascending order.") 

75 sl0 = ( 

76 np.array(range(slice_x[0], slice_x[1])).reshape(-1, 1, 1, 1) 

77 % a.shape[0] 

78 ) 

79 sl1 = ( 

80 np.array(range(slice_y[0], slice_y[1])).reshape(1, -1, 1, 1) 

81 % a.shape[1] 

82 ) 

83 sl2 = ( 

84 np.array(range(slice_theta[0], slice_theta[1])).reshape(1, 1, -1, 1) 

85 % a.shape[2] 

86 ) 

87 sl3 = ( 

88 np.array(range(slice_r[0], slice_r[1])).reshape(1, 1, 1, -1) 

89 % a.shape[3] 

90 ) 

91 return a[sl0, sl1, sl2, sl3] 

92 

93 

94def get_bin_middle(bins: np.ndarray) -> np.ndarray: 

95 """Return the middle of the input bins. 

96 

97 Args: 

98 bins: The input bins. 

99 

100 Returns: 

101 The middle of the input bins. 

102 

103 """ 

104 return (bins[1:] + bins[:-1]) / 2 

105 

106 

107def weighted_mean_of_two_matrices( 

108 first_matrix: np.ndarray, 

109 counts_first_matrix: np.ndarray, 

110 second_matrix: np.ndarray, 

111 counts_second_matrix: np.ndarray, 

112) -> np.ndarray: 

113 """ 

114 Calculates the weighted mean of two matrices. 

115 

116 Args: 

117 first_matrix (numpy.ndarray): First input matrix. 

118 counts_first_matrix (numpy.ndarray): Counts for the first input matrix. 

119 second_matrix (numpy.ndarray): Second input matrix. 

120 counts_second_matrix (numpy.ndarray): Counts for the second input 

121 matrix. 

122 

123 Returns: 

124 numpy.ndarray: Weighted mean of the two input matrices. 

125 """ 

126 vals_stack = np.stack([first_matrix, second_matrix], axis=0) 

127 counts_stack = np.stack( 

128 [counts_first_matrix, counts_second_matrix], axis=0 

129 ) 

130 counts_sum = np.nansum(counts_stack, axis=0) 

131 counts_sum_stack = np.stack([counts_sum] * 2, axis=0) 

132 weights = np.true_divide( 

133 counts_stack, 

134 counts_sum_stack, 

135 where=(counts_stack != 0) | (counts_sum_stack != 0), 

136 ) 

137 weighted_vals = np.multiply(vals_stack, weights) 

138 weighted_mean = np.nansum(weighted_vals, axis=0) 

139 

140 # If both values are nan, return nan 

141 both_nan = ( 

142 np.sum( 

143 np.stack( 

144 [np.isnan(first_matrix), np.isnan(second_matrix)], axis=0 

145 ), 

146 axis=0, 

147 ) 

148 == 2 

149 ) 

150 weighted_mean = np.where(both_nan, np.nan, weighted_mean) 

151 return weighted_mean 

152 

153 

154def test_weighted_mean_of_two_matrices( 

155 first_matrix: np.ndarray, counts_first_matrix: np.ndarray 

156) -> None: 

157 """ 

158 Test function for weighted_mean_of_two_matrices. 

159 

160 This function tests whether the weighted average of two matrices is equal 

161 to the input values. 

162 

163 Parameters: 

164 first_matrix (numpy.ndarray): The first input matrix. 

165 counts_first_matrix (numpy.ndarray): The counts matrix corresponding to 

166 the first input matrix. 

167 """ 

168 # ! Move this ot tests 

169 out = weighted_mean_of_two_matrices( 

170 first_matrix, counts_first_matrix, first_matrix, counts_first_matrix 

171 ) 

172 assert np.array_equal(out, first_matrix, equal_nan=True) 

173 print("Test passed.") 

174 

175 

176# def sample_nd_histogram(origin_histogram: np.ndarray, sample_count: int = 1) 

177# -> np.ndarray: 

178# """ 

179# Sample origin positions from heatmap with initial position. 

180 

181# Parameters: 

182# origin_histogram (ndarray): The heatmap with initial position. 

183# sample_count (int): The number of samples to generate. Default is 1. 

184 

185# Returns: 

186# ndarray: The sampled origin positions. 

187 

188# """ 

189# flat_origin_histogram = origin_histogram.flatten() 

190# flat_indices = range(len(flat_origin_histogram)) 

191# indices1d = np.random.choice( 

192# a=flat_indices, 

193# size=sample_count, 

194# replace=True, 

195# p=flat_origin_histogram / np.sum(flat_origin_histogram), 

196# ) 

197 

198# return np.unravel_index(indices1d, origin_histogram.shape) 

199 

200 

201def periodic_angular_conditions( 

202 angle: np.ndarray, angular_bins: np.ndarray 

203) -> np.ndarray: 

204 """Apply periodic boundary conditions to a list of angular coordinates. 

205 

206 Args: 

207 angle: A list of angular coordinates. 

208 angular_bins: An array of bin edges defining the angular grid cells. 

209 

210 Returns: 

211 The angular coordinates cast to the angular grid. 

212 """ 

213 angle -= angular_bins[0] 

214 angle = angle % (2 * np.pi) 

215 angle += angular_bins[0] 

216 return angle 

217 

218 

219def weighted_mean_of_matrix( 

220 field: np.ndarray, histogram: np.ndarray, axes: Tuple = (2, 3, 4) 

221) -> np.ndarray: 

222 """ 

223 Calculate the weighted mean of a matrix based on a given histogram. 

224 

225 Parameters: 

226 field (np.ndarray): The input matrix. 

227 histogram (np.ndarray): The histogram used for weighting. 

228 axes (Tuple): The axes along which to calculate the mean. 

229 Default is (2, 3, 4). 

230 

231 Returns: 

232 np.ndarray: The weighted mean of the matrix. 

233 

234 """ 

235 weights = histogram / np.sum(histogram) 

236 values = field 

237 

238 weighted_sum = np.sum(values * weights, axis=axes) 

239 sum_of_weights = np.sum(weights, axis=axes) 

240 

241 weighted_average = weighted_sum / sum_of_weights 

242 

243 # weighted_field = np.nansum(field * histogram, axis=axes) 

244 # position_histogram = np.nansum(histogram, axis=axes) 

245 # weighted_field /= np.where(position_histogram != 0, position_histogram, 

246 # np.inf) 

247 return weighted_average