Coverage for physped/core/langevin_model.py: 91%
75 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
1"""Langevin model class."""
3import logging
5import numpy as np
6import sdeint
8from physped.core.parametrize_potential import get_grid_indices
9from physped.core.piecewise_potential import PiecewisePotential
10from physped.utils.functions import (
11 cartesian_to_polar_coordinates,
12 periodic_angular_conditions,
13)
15log = logging.getLogger(__name__)
18SLOW_DERIVATIVES = {}
21def register_slow_derivative(name):
22 def decorator(fn):
23 SLOW_DERIVATIVES[name] = fn
24 return fn
26 return decorator
29@register_slow_derivative("low_pass_filter")
30def low_pass_filter(**kwargs) -> float:
31 # relaxation of slow dynamics toward fast dynamics
32 return -1 / kwargs["tau"] * (kwargs["slow"] - kwargs["fast"])
35@register_slow_derivative("use_fast_dynamics")
36def use_fast_dynamics(**kwargs) -> float:
37 return kwargs["fastdot"]
40@register_slow_derivative("integrate_slow_velocity")
41def use_slow_dynamics(**kwargs) -> float:
42 return kwargs["slowdot"]
45@register_slow_derivative("savgol_smoothing")
46def use_fast_dynamics2(**kwargs) -> float:
47 return kwargs["fastdot"]
50def get_slow_derivative(name: str):
51 return SLOW_DERIVATIVES.get(name)
54class LangevinModel:
55 """Langevin model class.
57 This class represents a Langevin model used for simulating pedestrian
58 trajectories. It contains methods for initializing the model, simulating
59 trajectories, and defining stopping conditions.
61 Attributes:
62 potential (PiecewisePotential): The piecewise potential object used
63 for modeling.
64 params (dict): A dictionary containing the model parameters.
66 """
68 def __init__(self, potential: PiecewisePotential, params: dict):
69 """Initialize Langevin model with parameters.
71 Args:
72 potential (PiecewisePotential): The piecewise potential object
73 used for modeling.
74 params (dict): A dictionary containing the model parameters.
76 """
77 self.potential = potential
78 self.params = params
79 # self.grid_counts = np.sum(potential.histogram, axis=(2, 3, 4))
80 # self.heatmap = np.sum(
81 # potential.histogram, axis=(2, 3, 4)
82 # ) / np.sum(potential.histogram)
84 def simulate(
85 self, X_0: np.ndarray, t_eval: np.ndarray = np.arange(0, 10, 0.1)
86 ) -> np.ndarray:
87 """
88 Simulates the Langevin model.
90 Parameters:
91 - X_0: Initial state of the system as a numpy array.
92 - t_eval: Time points at which to evaluate the solution.
93 Defaults to np.arange(0, 10, 0.1).
95 Returns:
96 - The simulated trajectory of the system as a numpy array.
97 """
98 return sdeint.itoSRI2(self.modelxy, self.noise, y0=X_0, tspan=t_eval)
100 def modelxy(self, state: np.ndarray, t) -> np.ndarray:
101 """
102 Calculate the derivatives of the Langevin model for the given state
103 variables.
105 Args:
106 X_0 (np.ndarray): Array of initial state variables
107 [xf, yf, uf, vf, xs, ys, us, vs].
109 Returns:
110 np.ndarray: Array of derivatives
111 [xfdot, yfdot, ufdot, vfdot, xsdot, ysdot, usdot, vsdot].
112 """
113 xf, yf, uf, vf, xs, ys, us, vs, t, k, Pid = state
114 xfdot = uf
115 yfdot = vf
116 k = np.max([k, 1])
117 dt = 1
118 dk = self.params.fps
120 if np.all(np.isnan(state)) or np.all(np.isinf(state)):
121 return np.zeros(len(state))
123 if self.particle_outside_grid(xf, yf):
124 log.info("Pid %s: left the grid at t = %.2f s", int(Pid), t)
125 return np.repeat(np.inf, len(state))
127 rs, thetas = cartesian_to_polar_coordinates(us, vs)
128 thetas = periodic_angular_conditions(
129 thetas, self.params.grid.bins["theta"]
130 )
131 slow_state = [xs, ys, rs, thetas, k]
132 slow_state_index = get_grid_indices(self.potential, slow_state)
134 xmean, ymean, umean, vmean = self.potential.parametrization[
135 slow_state_index[0],
136 slow_state_index[1],
137 slow_state_index[2],
138 slow_state_index[3],
139 slow_state_index[4],
140 :,
141 0,
142 ]
143 beta_x, beta_y, beta_u, beta_v = self.potential.parametrization[
144 slow_state_index[0],
145 slow_state_index[1],
146 slow_state_index[2],
147 slow_state_index[3],
148 slow_state_index[4],
149 :,
150 1,
151 ]
153 if np.all(
154 np.isnan(
155 [xmean, ymean, umean, vmean, beta_x, beta_y, beta_u, beta_v]
156 )
157 ):
158 # log.warning("Pid %s: reached hole in the potential
159 # at t = %.2f s", int(Pid), t)
160 # TODO : Find a fix to handle holes in the potential e.g.
161 # coarse graining, closest neighbour
163 return np.zeros(len(state)) * np.nan
165 V_x = beta_x * (xf - xmean)
166 V_y = beta_y * (yf - ymean)
168 V_u = beta_u * (uf - umean)
169 V_v = beta_v * (vf - vmean)
171 # acceleration fast modes (-grad V, random noise excluded)
172 ufdot = -V_x - V_u
173 vfdot = -V_y - V_v
175 slow_position_derivative_algorithm = get_slow_derivative(
176 self.params.model.slow_positions_algorithm
177 )
178 slow_velocity_derivative_algorithm = get_slow_derivative(
179 self.params.model.slow_velocities_algorithm
180 )
181 xsdot = slow_position_derivative_algorithm(
182 tau=self.params.model.taux,
183 slow=xs,
184 fast=xf,
185 slowdot=us,
186 fastdot=uf,
187 dt=self.params.model.dt,
188 )
189 ysdot = slow_position_derivative_algorithm(
190 tau=self.params.model.taux,
191 slow=ys,
192 fast=yf,
193 slowdot=vs,
194 fastdot=vf,
195 dt=self.params.model.dt,
196 )
197 usdot = slow_velocity_derivative_algorithm(
198 tau=self.params.model.tauu,
199 slow=us,
200 fast=uf,
201 fastdot=ufdot,
202 dt=self.params.model.dt,
203 )
204 vsdot = slow_velocity_derivative_algorithm(
205 tau=self.params.model.tauu,
206 slow=vs,
207 fast=vf,
208 fastdot=vfdot,
209 dt=self.params.model.dt,
210 )
212 return np.array(
213 [xfdot, yfdot, ufdot, vfdot, xsdot, ysdot, usdot, vsdot, dt, dk, 0]
214 )
216 def noise(self, X_0, t) -> np.ndarray:
217 """Return noise matrix.
219 Returns:
220 numpy.ndarray: A diagonal matrix representing the noise matrix.
221 The diagonal elements correspond to the independent driving Wiener
222 processes.
224 """
225 noise = self.params.model.sigma
226 return np.diag(
227 [0.0, 0.0, noise, noise, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
228 )
230 def particle_outside_grid(self, xf: float, yf: float) -> bool:
231 """Check if particle is outside the grid.
233 Args:
234 xf: The x-coordinate of the particle.
235 yf: The y-coordinate of the particle.
237 Returns:
238 Whether the particle is outside the grid.
239 """
240 c0 = xf < self.params.grid.x.min
241 c1 = xf > self.params.grid.x.max
242 c2 = yf < self.params.grid.y.min
243 c3 = yf > self.params.grid.y.max
244 return any([c0, c1, c2, c3])