Coverage for physped/core/slow_dynamics.py: 76%

68 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-01 09:28 +0000

1import logging 

2 

3import numpy as np 

4import pandas as pd 

5from omegaconf import DictConfig 

6from scipy.signal import savgol_filter 

7from tqdm import tqdm 

8from tqdm.contrib.logging import logging_redirect_tqdm 

9 

10from physped.preprocessing.trajectories import ( 

11 transform_slow_velocity_to_polar_coordinates, 

12) 

13from physped.utils.functions import compose_functions 

14 

15log = logging.getLogger(__name__) 

16 

17 

18def integrate_slow_velocity_single_path( 

19 x0: float, us: pd.Series, dt: float 

20) -> list: 

21 """Compute slow dynamics by integrating the slow velocity""" 

22 us = list(us) 

23 xs = np.zeros_like(us) 

24 xs[0] = x0 # Initialize xs(0) to x(0) 

25 for i in range(0, len(xs) - 1): 

26 xs[i + 1] = xs[i] + dt * us[i] 

27 return xs 

28 

29 

30def low_pass_filter_single_path( 

31 fast: pd.Series, tau: float, dt: float 

32) -> list: 

33 """Compute slow dynamics of single trajectory with low pass filter.""" 

34 fast = list(fast) 

35 slow = np.zeros_like(fast) 

36 slow[0] = fast[0] 

37 for i in range(len(fast) - 1): 

38 slow[i + 1] = (1 - dt / tau) * slow[i] + dt / tau * fast[i] 

39 return slow 

40 

41 

42SLOW_ALGORITHMS = {} 

43 

44 

45def register_slow_algorithm(name: str): 

46 def decorator(fn): 

47 SLOW_ALGORITHMS[name] = fn 

48 return fn 

49 

50 return decorator 

51 

52 

53@register_slow_algorithm("low_pass_filter") 

54def low_pass_filter_all_paths(df: pd.DataFrame, **kwargs): 

55 grouped_paths = df.groupby("Pid")[kwargs["colname"]] 

56 return grouped_paths.transform( 

57 lambda x: low_pass_filter_single_path(x, kwargs["tau"], kwargs["dt"]) 

58 ) 

59 

60 

61@register_slow_algorithm("use_fast_dynamics") 

62def use_fast_dynamics(df: pd.DataFrame, **kwargs): 

63 return df[kwargs["colname"]] 

64 

65 

66@register_slow_algorithm("integrate_slow_velocity") 

67def integrate_slow_velocity(df: pd.DataFrame, **kwargs): 

68 slow = [] 

69 with logging_redirect_tqdm(): 

70 for _, traj_i in tqdm( 

71 df.groupby("Pid"), 

72 desc=f"Computing {kwargs['colname'][0]}s with {kwargs['vel_col']}", 

73 unit="trajs", 

74 total=len(df.Pid.unique()), 

75 miniters=100, 

76 ): 

77 x0 = traj_i[kwargs["colname"]].iloc[0] 

78 slow_path = integrate_slow_velocity_single_path( 

79 x0, traj_i[kwargs["vel_col"]], dt=kwargs["dt"] 

80 ) 

81 slow.extend(slow_path) 

82 return slow 

83 

84 

85@register_slow_algorithm("savgol_smoothing") 

86def savgol_smoothing(df: pd.DataFrame, **kwargs): 

87 slow = df.groupby("Pid")[kwargs["colname"]].transform( 

88 lambda x: savgol_filter( 

89 x, 

90 window_length=kwargs["window_length"], 

91 polyorder=2, 

92 deriv=0, 

93 mode="interp", 

94 ) 

95 ) 

96 return slow 

97 

98 

99def get_slow_algorithm(name: str): 

100 return SLOW_ALGORITHMS.get(name) 

101 

102 

103def compute_slow_velocity( 

104 df: pd.DataFrame, config: DictConfig 

105) -> pd.DataFrame: 

106 slow_velocity_algorithm = get_slow_algorithm( 

107 config.params.model.slow_velocities_algorithm 

108 ) 

109 log.info("Slow velocity algorithm: %s", slow_velocity_algorithm) 

110 

111 df["us"] = slow_velocity_algorithm( 

112 df, 

113 colname="uf", 

114 tau=config.params.model["tauu"], 

115 dt=config.params.model["dt"], 

116 window_length=config.params.minimum_trajectory_length, 

117 ) 

118 df["vs"] = slow_velocity_algorithm( 

119 df, 

120 colname="vf", 

121 tau=config.params.model["tauu"], 

122 dt=config.params.model["dt"], 

123 window_length=config.params.minimum_trajectory_length, 

124 ) 

125 return df 

126 

127 

128def compute_slow_position( 

129 df: pd.DataFrame, config: DictConfig 

130) -> pd.DataFrame: 

131 slow_position_algorithm = get_slow_algorithm( 

132 config.params.model.slow_positions_algorithm 

133 ) 

134 log.info("Slow position algorithm: %s", slow_position_algorithm) 

135 

136 df["xs"] = slow_position_algorithm( 

137 df, 

138 colname="xf", 

139 vel_col="us", 

140 tau=config.params.model["taux"], 

141 dt=config.params.model["dt"], 

142 window_length=config.params.minimum_trajectory_length, 

143 ) 

144 df["ys"] = slow_position_algorithm( 

145 df, 

146 colname="yf", 

147 vel_col="vs", 

148 tau=config.params.model["taux"], 

149 dt=config.params.model["dt"], 

150 window_length=config.params.minimum_trajectory_length, 

151 ) 

152 return df 

153 

154 

155def compute_slow_dynamics(preprocessed_trajectories, config): 

156 log.info("Compute slow dynamics") 

157 compute_slow_dynamics_pipeline = compose_functions( 

158 compute_slow_velocity, 

159 transform_slow_velocity_to_polar_coordinates, 

160 # apply_periodic_angular_conditions, 

161 compute_slow_position, 

162 ) 

163 return compute_slow_dynamics_pipeline( 

164 preprocessed_trajectories, config=config 

165 ) 

166 

167 

168# def compute_slow_dynamics(df: pd.DataFrame, config: dict) -> pd.DataFrame: 

169# dt = config.params.model["dt"] 

170# tauu = config.params.model["tauu"] 

171# window_length = config.params.minimum_trajectory_length 

172# slow_velocity_algorithm = get_slow_algorithm( 

173# config.params.model.slow_velocities_algorithm) 

174# log.info("Slow velocity algorithm: %s", slow_velocity_algorithm) 

175 

176# df["us"] = slow_velocity_algorithm(df, colname="uf", tau=tauu, dt=dt, 

177# window_length=window_length) 

178# df["vs"] = slow_velocity_algorithm(df, colname="vf", tau=tauu, dt=dt, 

179# window_length=window_length) 

180# df = transform_slow_velocity_to_polar_coordinates(df, config) 

181# df["thetas"] = periodic_angular_conditions(df["thetas"], 

182# config.params.grid.bins["theta"]) 

183 

184# taux = config.params.model["taux"] 

185# slow_position_algorithm = get_slow_algorithm( 

186# config.params.model.slow_positions_algorithm) 

187# log.info("Slow position algorithm: %s", slow_position_algorithm) 

188 

189# df["xs"] = slow_position_algorithm(df, colname="xf", vel_col="us", 

190# tau=taux, dt=dt, window_length=window_length) 

191# df["ys"] = slow_position_algorithm(df, colname="yf", vel_col="vs", 

192# tau=taux, dt=dt, window_length=window_length) 

193# return df