Source code for physped.core.slow_dynamics

import logging

import numpy as np
import pandas as pd
from omegaconf import DictConfig
from scipy.signal import savgol_filter
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

from physped.preprocessing.trajectories import (
    transform_slow_velocity_to_polar_coordinates,
)
from physped.utils.functions import compose_functions

log = logging.getLogger(__name__)


[docs] def integrate_slow_velocity_single_path( x0: float, us: pd.Series, dt: float ) -> list: """Compute slow dynamics by integrating the slow velocity""" us = list(us) xs = np.zeros_like(us) xs[0] = x0 # Initialize xs(0) to x(0) for i in range(0, len(xs) - 1): xs[i + 1] = xs[i] + dt * us[i] return xs
[docs] def low_pass_filter_single_path( fast: pd.Series, tau: float, dt: float ) -> list: """Compute slow dynamics of single trajectory with low pass filter.""" fast = list(fast) slow = np.zeros_like(fast) slow[0] = fast[0] for i in range(len(fast) - 1): slow[i + 1] = (1 - dt / tau) * slow[i] + dt / tau * fast[i] return slow
SLOW_ALGORITHMS = {}
[docs] def register_slow_algorithm(name: str): def decorator(fn): SLOW_ALGORITHMS[name] = fn return fn return decorator
[docs] @register_slow_algorithm("low_pass_filter") def low_pass_filter_all_paths(df: pd.DataFrame, **kwargs): grouped_paths = df.groupby("Pid")[kwargs["colname"]] return grouped_paths.transform( lambda x: low_pass_filter_single_path(x, kwargs["tau"], kwargs["dt"]) )
[docs] @register_slow_algorithm("use_fast_dynamics") def use_fast_dynamics(df: pd.DataFrame, **kwargs): return df[kwargs["colname"]]
[docs] @register_slow_algorithm("integrate_slow_velocity") def integrate_slow_velocity(df: pd.DataFrame, **kwargs): slow = [] with logging_redirect_tqdm(): for _, traj_i in tqdm( df.groupby("Pid"), desc=f"Computing {kwargs['colname'][0]}s with {kwargs['vel_col']}", unit="trajs", total=len(df.Pid.unique()), miniters=100, ): x0 = traj_i[kwargs["colname"]].iloc[0] slow_path = integrate_slow_velocity_single_path( x0, traj_i[kwargs["vel_col"]], dt=kwargs["dt"] ) slow.extend(slow_path) return slow
[docs] @register_slow_algorithm("savgol_smoothing") def savgol_smoothing(df: pd.DataFrame, **kwargs): slow = df.groupby("Pid")[kwargs["colname"]].transform( lambda x: savgol_filter( x, window_length=kwargs["window_length"], polyorder=2, deriv=0, mode="interp", ) ) return slow
[docs] def get_slow_algorithm(name: str): return SLOW_ALGORITHMS.get(name)
[docs] def compute_slow_velocity( df: pd.DataFrame, config: DictConfig ) -> pd.DataFrame: slow_velocity_algorithm = get_slow_algorithm( config.params.model.slow_velocities_algorithm ) log.info("Slow velocity algorithm: %s", slow_velocity_algorithm) df["us"] = slow_velocity_algorithm( df, colname="uf", tau=config.params.model["tauu"], dt=config.params.model["dt"], window_length=config.params.minimum_trajectory_length, ) df["vs"] = slow_velocity_algorithm( df, colname="vf", tau=config.params.model["tauu"], dt=config.params.model["dt"], window_length=config.params.minimum_trajectory_length, ) return df
[docs] def compute_slow_position( df: pd.DataFrame, config: DictConfig ) -> pd.DataFrame: slow_position_algorithm = get_slow_algorithm( config.params.model.slow_positions_algorithm ) log.info("Slow position algorithm: %s", slow_position_algorithm) df["xs"] = slow_position_algorithm( df, colname="xf", vel_col="us", tau=config.params.model["taux"], dt=config.params.model["dt"], window_length=config.params.minimum_trajectory_length, ) df["ys"] = slow_position_algorithm( df, colname="yf", vel_col="vs", tau=config.params.model["taux"], dt=config.params.model["dt"], window_length=config.params.minimum_trajectory_length, ) return df
[docs] def compute_slow_dynamics(preprocessed_trajectories, config): log.info("Compute slow dynamics") compute_slow_dynamics_pipeline = compose_functions( compute_slow_velocity, transform_slow_velocity_to_polar_coordinates, # apply_periodic_angular_conditions, compute_slow_position, ) return compute_slow_dynamics_pipeline( preprocessed_trajectories, config=config )
# def compute_slow_dynamics(df: pd.DataFrame, config: dict) -> pd.DataFrame: # dt = config.params.model["dt"] # tauu = config.params.model["tauu"] # window_length = config.params.minimum_trajectory_length # slow_velocity_algorithm = get_slow_algorithm( # config.params.model.slow_velocities_algorithm) # log.info("Slow velocity algorithm: %s", slow_velocity_algorithm) # df["us"] = slow_velocity_algorithm(df, colname="uf", tau=tauu, dt=dt, # window_length=window_length) # df["vs"] = slow_velocity_algorithm(df, colname="vf", tau=tauu, dt=dt, # window_length=window_length) # df = transform_slow_velocity_to_polar_coordinates(df, config) # df["thetas"] = periodic_angular_conditions(df["thetas"], # config.params.grid.bins["theta"]) # taux = config.params.model["taux"] # slow_position_algorithm = get_slow_algorithm( # config.params.model.slow_positions_algorithm) # log.info("Slow position algorithm: %s", slow_position_algorithm) # df["xs"] = slow_position_algorithm(df, colname="xf", vel_col="us", # tau=taux, dt=dt, window_length=window_length) # df["ys"] = slow_position_algorithm(df, colname="yf", vel_col="vs", # tau=taux, dt=dt, window_length=window_length) # return df