Coverage for physped/core/slow_dynamics.py: 76%
68 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
1import logging
3import numpy as np
4import pandas as pd
5from omegaconf import DictConfig
6from scipy.signal import savgol_filter
7from tqdm import tqdm
8from tqdm.contrib.logging import logging_redirect_tqdm
10from physped.preprocessing.trajectories import (
11 transform_slow_velocity_to_polar_coordinates,
12)
13from physped.utils.functions import compose_functions
15log = logging.getLogger(__name__)
18def integrate_slow_velocity_single_path(
19 x0: float, us: pd.Series, dt: float
20) -> list:
21 """Compute slow dynamics by integrating the slow velocity"""
22 us = list(us)
23 xs = np.zeros_like(us)
24 xs[0] = x0 # Initialize xs(0) to x(0)
25 for i in range(0, len(xs) - 1):
26 xs[i + 1] = xs[i] + dt * us[i]
27 return xs
30def low_pass_filter_single_path(
31 fast: pd.Series, tau: float, dt: float
32) -> list:
33 """Compute slow dynamics of single trajectory with low pass filter."""
34 fast = list(fast)
35 slow = np.zeros_like(fast)
36 slow[0] = fast[0]
37 for i in range(len(fast) - 1):
38 slow[i + 1] = (1 - dt / tau) * slow[i] + dt / tau * fast[i]
39 return slow
42SLOW_ALGORITHMS = {}
45def register_slow_algorithm(name: str):
46 def decorator(fn):
47 SLOW_ALGORITHMS[name] = fn
48 return fn
50 return decorator
53@register_slow_algorithm("low_pass_filter")
54def low_pass_filter_all_paths(df: pd.DataFrame, **kwargs):
55 grouped_paths = df.groupby("Pid")[kwargs["colname"]]
56 return grouped_paths.transform(
57 lambda x: low_pass_filter_single_path(x, kwargs["tau"], kwargs["dt"])
58 )
61@register_slow_algorithm("use_fast_dynamics")
62def use_fast_dynamics(df: pd.DataFrame, **kwargs):
63 return df[kwargs["colname"]]
66@register_slow_algorithm("integrate_slow_velocity")
67def integrate_slow_velocity(df: pd.DataFrame, **kwargs):
68 slow = []
69 with logging_redirect_tqdm():
70 for _, traj_i in tqdm(
71 df.groupby("Pid"),
72 desc=f"Computing {kwargs['colname'][0]}s with {kwargs['vel_col']}",
73 unit="trajs",
74 total=len(df.Pid.unique()),
75 miniters=100,
76 ):
77 x0 = traj_i[kwargs["colname"]].iloc[0]
78 slow_path = integrate_slow_velocity_single_path(
79 x0, traj_i[kwargs["vel_col"]], dt=kwargs["dt"]
80 )
81 slow.extend(slow_path)
82 return slow
85@register_slow_algorithm("savgol_smoothing")
86def savgol_smoothing(df: pd.DataFrame, **kwargs):
87 slow = df.groupby("Pid")[kwargs["colname"]].transform(
88 lambda x: savgol_filter(
89 x,
90 window_length=kwargs["window_length"],
91 polyorder=2,
92 deriv=0,
93 mode="interp",
94 )
95 )
96 return slow
99def get_slow_algorithm(name: str):
100 return SLOW_ALGORITHMS.get(name)
103def compute_slow_velocity(
104 df: pd.DataFrame, config: DictConfig
105) -> pd.DataFrame:
106 slow_velocity_algorithm = get_slow_algorithm(
107 config.params.model.slow_velocities_algorithm
108 )
109 log.info("Slow velocity algorithm: %s", slow_velocity_algorithm)
111 df["us"] = slow_velocity_algorithm(
112 df,
113 colname="uf",
114 tau=config.params.model["tauu"],
115 dt=config.params.model["dt"],
116 window_length=config.params.minimum_trajectory_length,
117 )
118 df["vs"] = slow_velocity_algorithm(
119 df,
120 colname="vf",
121 tau=config.params.model["tauu"],
122 dt=config.params.model["dt"],
123 window_length=config.params.minimum_trajectory_length,
124 )
125 return df
128def compute_slow_position(
129 df: pd.DataFrame, config: DictConfig
130) -> pd.DataFrame:
131 slow_position_algorithm = get_slow_algorithm(
132 config.params.model.slow_positions_algorithm
133 )
134 log.info("Slow position algorithm: %s", slow_position_algorithm)
136 df["xs"] = slow_position_algorithm(
137 df,
138 colname="xf",
139 vel_col="us",
140 tau=config.params.model["taux"],
141 dt=config.params.model["dt"],
142 window_length=config.params.minimum_trajectory_length,
143 )
144 df["ys"] = slow_position_algorithm(
145 df,
146 colname="yf",
147 vel_col="vs",
148 tau=config.params.model["taux"],
149 dt=config.params.model["dt"],
150 window_length=config.params.minimum_trajectory_length,
151 )
152 return df
155def compute_slow_dynamics(preprocessed_trajectories, config):
156 log.info("Compute slow dynamics")
157 compute_slow_dynamics_pipeline = compose_functions(
158 compute_slow_velocity,
159 transform_slow_velocity_to_polar_coordinates,
160 # apply_periodic_angular_conditions,
161 compute_slow_position,
162 )
163 return compute_slow_dynamics_pipeline(
164 preprocessed_trajectories, config=config
165 )
168# def compute_slow_dynamics(df: pd.DataFrame, config: dict) -> pd.DataFrame:
169# dt = config.params.model["dt"]
170# tauu = config.params.model["tauu"]
171# window_length = config.params.minimum_trajectory_length
172# slow_velocity_algorithm = get_slow_algorithm(
173# config.params.model.slow_velocities_algorithm)
174# log.info("Slow velocity algorithm: %s", slow_velocity_algorithm)
176# df["us"] = slow_velocity_algorithm(df, colname="uf", tau=tauu, dt=dt,
177# window_length=window_length)
178# df["vs"] = slow_velocity_algorithm(df, colname="vf", tau=tauu, dt=dt,
179# window_length=window_length)
180# df = transform_slow_velocity_to_polar_coordinates(df, config)
181# df["thetas"] = periodic_angular_conditions(df["thetas"],
182# config.params.grid.bins["theta"])
184# taux = config.params.model["taux"]
185# slow_position_algorithm = get_slow_algorithm(
186# config.params.model.slow_positions_algorithm)
187# log.info("Slow position algorithm: %s", slow_position_algorithm)
189# df["xs"] = slow_position_algorithm(df, colname="xf", vel_col="us",
190# tau=taux, dt=dt, window_length=window_length)
191# df["ys"] = slow_position_algorithm(df, colname="yf", vel_col="vs",
192# tau=taux, dt=dt, window_length=window_length)
193# return df