"""Langevin model class."""
import logging
import numpy as np
import sdeint
from physped.core.parametrize_potential import get_grid_indices
from physped.core.piecewise_potential import PiecewisePotential
from physped.utils.functions import (
cartesian_to_polar_coordinates,
periodic_angular_conditions,
)
log = logging.getLogger(__name__)
SLOW_DERIVATIVES = {}
[docs]
def register_slow_derivative(name):
def decorator(fn):
SLOW_DERIVATIVES[name] = fn
return fn
return decorator
[docs]
@register_slow_derivative("low_pass_filter")
def low_pass_filter(**kwargs) -> float:
# relaxation of slow dynamics toward fast dynamics
return -1 / kwargs["tau"] * (kwargs["slow"] - kwargs["fast"])
[docs]
@register_slow_derivative("use_fast_dynamics")
def use_fast_dynamics(**kwargs) -> float:
return kwargs["fastdot"]
[docs]
@register_slow_derivative("integrate_slow_velocity")
def use_slow_dynamics(**kwargs) -> float:
return kwargs["slowdot"]
[docs]
@register_slow_derivative("savgol_smoothing")
def use_fast_dynamics2(**kwargs) -> float:
return kwargs["fastdot"]
[docs]
def get_slow_derivative(name: str):
return SLOW_DERIVATIVES.get(name)
[docs]
class LangevinModel:
"""Langevin model class.
This class represents a Langevin model used for simulating pedestrian
trajectories. It contains methods for initializing the model, simulating
trajectories, and defining stopping conditions.
Attributes:
potential (PiecewisePotential): The piecewise potential object used
for modeling.
params (dict): A dictionary containing the model parameters.
"""
def __init__(self, potential: PiecewisePotential, params: dict):
"""Initialize Langevin model with parameters.
Args:
potential (PiecewisePotential): The piecewise potential object
used for modeling.
params (dict): A dictionary containing the model parameters.
"""
self.potential = potential
self.params = params
# self.grid_counts = np.sum(potential.histogram, axis=(2, 3, 4))
# self.heatmap = np.sum(
# potential.histogram, axis=(2, 3, 4)
# ) / np.sum(potential.histogram)
[docs]
def simulate(
self, X_0: np.ndarray, t_eval: np.ndarray = np.arange(0, 10, 0.1)
) -> np.ndarray:
"""
Simulates the Langevin model.
Parameters:
- X_0: Initial state of the system as a numpy array.
- t_eval: Time points at which to evaluate the solution.
Defaults to np.arange(0, 10, 0.1).
Returns:
- The simulated trajectory of the system as a numpy array.
"""
return sdeint.itoSRI2(self.modelxy, self.noise, y0=X_0, tspan=t_eval)
[docs]
def modelxy(self, state: np.ndarray, t) -> np.ndarray:
"""
Calculate the derivatives of the Langevin model for the given state
variables.
Args:
X_0 (np.ndarray): Array of initial state variables
[xf, yf, uf, vf, xs, ys, us, vs].
Returns:
np.ndarray: Array of derivatives
[xfdot, yfdot, ufdot, vfdot, xsdot, ysdot, usdot, vsdot].
"""
xf, yf, uf, vf, xs, ys, us, vs, t, k, Pid = state
xfdot = uf
yfdot = vf
k = np.max([k, 1])
dt = 1
dk = self.params.fps
if np.all(np.isnan(state)) or np.all(np.isinf(state)):
return np.zeros(len(state))
if self.particle_outside_grid(xf, yf):
log.info("Pid %s: left the grid at t = %.2f s", int(Pid), t)
return np.repeat(np.inf, len(state))
rs, thetas = cartesian_to_polar_coordinates(us, vs)
thetas = periodic_angular_conditions(
thetas, self.params.grid.bins["theta"]
)
slow_state = [xs, ys, rs, thetas, k]
slow_state_index = get_grid_indices(self.potential, slow_state)
xmean, ymean, umean, vmean = self.potential.parametrization[
slow_state_index[0],
slow_state_index[1],
slow_state_index[2],
slow_state_index[3],
slow_state_index[4],
:,
0,
]
beta_x, beta_y, beta_u, beta_v = self.potential.parametrization[
slow_state_index[0],
slow_state_index[1],
slow_state_index[2],
slow_state_index[3],
slow_state_index[4],
:,
1,
]
if np.all(
np.isnan(
[xmean, ymean, umean, vmean, beta_x, beta_y, beta_u, beta_v]
)
):
# log.warning("Pid %s: reached hole in the potential
# at t = %.2f s", int(Pid), t)
# TODO : Find a fix to handle holes in the potential e.g.
# coarse graining, closest neighbour
return np.zeros(len(state)) * np.nan
V_x = beta_x * (xf - xmean)
V_y = beta_y * (yf - ymean)
V_u = beta_u * (uf - umean)
V_v = beta_v * (vf - vmean)
# acceleration fast modes (-grad V, random noise excluded)
ufdot = -V_x - V_u
vfdot = -V_y - V_v
slow_position_derivative_algorithm = get_slow_derivative(
self.params.model.slow_positions_algorithm
)
slow_velocity_derivative_algorithm = get_slow_derivative(
self.params.model.slow_velocities_algorithm
)
xsdot = slow_position_derivative_algorithm(
tau=self.params.model.taux,
slow=xs,
fast=xf,
slowdot=us,
fastdot=uf,
dt=self.params.model.dt,
)
ysdot = slow_position_derivative_algorithm(
tau=self.params.model.taux,
slow=ys,
fast=yf,
slowdot=vs,
fastdot=vf,
dt=self.params.model.dt,
)
usdot = slow_velocity_derivative_algorithm(
tau=self.params.model.tauu,
slow=us,
fast=uf,
fastdot=ufdot,
dt=self.params.model.dt,
)
vsdot = slow_velocity_derivative_algorithm(
tau=self.params.model.tauu,
slow=vs,
fast=vf,
fastdot=vfdot,
dt=self.params.model.dt,
)
return np.array(
[xfdot, yfdot, ufdot, vfdot, xsdot, ysdot, usdot, vsdot, dt, dk, 0]
)
[docs]
def noise(self, X_0, t) -> np.ndarray:
"""Return noise matrix.
Returns:
numpy.ndarray: A diagonal matrix representing the noise matrix.
The diagonal elements correspond to the independent driving Wiener
processes.
"""
noise = self.params.model.sigma
return np.diag(
[0.0, 0.0, noise, noise, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
)
[docs]
def particle_outside_grid(self, xf: float, yf: float) -> bool:
"""Check if particle is outside the grid.
Args:
xf: The x-coordinate of the particle.
yf: The y-coordinate of the particle.
Returns:
Whether the particle is outside the grid.
"""
c0 = xf < self.params.grid.x.min
c1 = xf > self.params.grid.x.max
c2 = yf < self.params.grid.y.min
c3 = yf > self.params.grid.y.max
return any([c0, c1, c2, c3])