Coverage for physped/core/langevin_model.py: 91%

75 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-01 09:28 +0000

1"""Langevin model class.""" 

2 

3import logging 

4 

5import numpy as np 

6import sdeint 

7 

8from physped.core.parametrize_potential import get_grid_indices 

9from physped.core.piecewise_potential import PiecewisePotential 

10from physped.utils.functions import ( 

11 cartesian_to_polar_coordinates, 

12 periodic_angular_conditions, 

13) 

14 

15log = logging.getLogger(__name__) 

16 

17 

18SLOW_DERIVATIVES = {} 

19 

20 

21def register_slow_derivative(name): 

22 def decorator(fn): 

23 SLOW_DERIVATIVES[name] = fn 

24 return fn 

25 

26 return decorator 

27 

28 

29@register_slow_derivative("low_pass_filter") 

30def low_pass_filter(**kwargs) -> float: 

31 # relaxation of slow dynamics toward fast dynamics 

32 return -1 / kwargs["tau"] * (kwargs["slow"] - kwargs["fast"]) 

33 

34 

35@register_slow_derivative("use_fast_dynamics") 

36def use_fast_dynamics(**kwargs) -> float: 

37 return kwargs["fastdot"] 

38 

39 

40@register_slow_derivative("integrate_slow_velocity") 

41def use_slow_dynamics(**kwargs) -> float: 

42 return kwargs["slowdot"] 

43 

44 

45@register_slow_derivative("savgol_smoothing") 

46def use_fast_dynamics2(**kwargs) -> float: 

47 return kwargs["fastdot"] 

48 

49 

50def get_slow_derivative(name: str): 

51 return SLOW_DERIVATIVES.get(name) 

52 

53 

54class LangevinModel: 

55 """Langevin model class. 

56 

57 This class represents a Langevin model used for simulating pedestrian 

58 trajectories. It contains methods for initializing the model, simulating 

59 trajectories, and defining stopping conditions. 

60 

61 Attributes: 

62 potential (PiecewisePotential): The piecewise potential object used 

63 for modeling. 

64 params (dict): A dictionary containing the model parameters. 

65 

66 """ 

67 

68 def __init__(self, potential: PiecewisePotential, params: dict): 

69 """Initialize Langevin model with parameters. 

70 

71 Args: 

72 potential (PiecewisePotential): The piecewise potential object 

73 used for modeling. 

74 params (dict): A dictionary containing the model parameters. 

75 

76 """ 

77 self.potential = potential 

78 self.params = params 

79 # self.grid_counts = np.sum(potential.histogram, axis=(2, 3, 4)) 

80 # self.heatmap = np.sum( 

81 # potential.histogram, axis=(2, 3, 4) 

82 # ) / np.sum(potential.histogram) 

83 

84 def simulate( 

85 self, X_0: np.ndarray, t_eval: np.ndarray = np.arange(0, 10, 0.1) 

86 ) -> np.ndarray: 

87 """ 

88 Simulates the Langevin model. 

89 

90 Parameters: 

91 - X_0: Initial state of the system as a numpy array. 

92 - t_eval: Time points at which to evaluate the solution. 

93 Defaults to np.arange(0, 10, 0.1). 

94 

95 Returns: 

96 - The simulated trajectory of the system as a numpy array. 

97 """ 

98 return sdeint.itoSRI2(self.modelxy, self.noise, y0=X_0, tspan=t_eval) 

99 

100 def modelxy(self, state: np.ndarray, t) -> np.ndarray: 

101 """ 

102 Calculate the derivatives of the Langevin model for the given state 

103 variables. 

104 

105 Args: 

106 X_0 (np.ndarray): Array of initial state variables 

107 [xf, yf, uf, vf, xs, ys, us, vs]. 

108 

109 Returns: 

110 np.ndarray: Array of derivatives 

111 [xfdot, yfdot, ufdot, vfdot, xsdot, ysdot, usdot, vsdot]. 

112 """ 

113 xf, yf, uf, vf, xs, ys, us, vs, t, k, Pid = state 

114 xfdot = uf 

115 yfdot = vf 

116 k = np.max([k, 1]) 

117 dt = 1 

118 dk = self.params.fps 

119 

120 if np.all(np.isnan(state)) or np.all(np.isinf(state)): 

121 return np.zeros(len(state)) 

122 

123 if self.particle_outside_grid(xf, yf): 

124 log.info("Pid %s: left the grid at t = %.2f s", int(Pid), t) 

125 return np.repeat(np.inf, len(state)) 

126 

127 rs, thetas = cartesian_to_polar_coordinates(us, vs) 

128 thetas = periodic_angular_conditions( 

129 thetas, self.params.grid.bins["theta"] 

130 ) 

131 slow_state = [xs, ys, rs, thetas, k] 

132 slow_state_index = get_grid_indices(self.potential, slow_state) 

133 

134 xmean, ymean, umean, vmean = self.potential.parametrization[ 

135 slow_state_index[0], 

136 slow_state_index[1], 

137 slow_state_index[2], 

138 slow_state_index[3], 

139 slow_state_index[4], 

140 :, 

141 0, 

142 ] 

143 beta_x, beta_y, beta_u, beta_v = self.potential.parametrization[ 

144 slow_state_index[0], 

145 slow_state_index[1], 

146 slow_state_index[2], 

147 slow_state_index[3], 

148 slow_state_index[4], 

149 :, 

150 1, 

151 ] 

152 

153 if np.all( 

154 np.isnan( 

155 [xmean, ymean, umean, vmean, beta_x, beta_y, beta_u, beta_v] 

156 ) 

157 ): 

158 # log.warning("Pid %s: reached hole in the potential 

159 # at t = %.2f s", int(Pid), t) 

160 # TODO : Find a fix to handle holes in the potential e.g. 

161 # coarse graining, closest neighbour 

162 

163 return np.zeros(len(state)) * np.nan 

164 

165 V_x = beta_x * (xf - xmean) 

166 V_y = beta_y * (yf - ymean) 

167 

168 V_u = beta_u * (uf - umean) 

169 V_v = beta_v * (vf - vmean) 

170 

171 # acceleration fast modes (-grad V, random noise excluded) 

172 ufdot = -V_x - V_u 

173 vfdot = -V_y - V_v 

174 

175 slow_position_derivative_algorithm = get_slow_derivative( 

176 self.params.model.slow_positions_algorithm 

177 ) 

178 slow_velocity_derivative_algorithm = get_slow_derivative( 

179 self.params.model.slow_velocities_algorithm 

180 ) 

181 xsdot = slow_position_derivative_algorithm( 

182 tau=self.params.model.taux, 

183 slow=xs, 

184 fast=xf, 

185 slowdot=us, 

186 fastdot=uf, 

187 dt=self.params.model.dt, 

188 ) 

189 ysdot = slow_position_derivative_algorithm( 

190 tau=self.params.model.taux, 

191 slow=ys, 

192 fast=yf, 

193 slowdot=vs, 

194 fastdot=vf, 

195 dt=self.params.model.dt, 

196 ) 

197 usdot = slow_velocity_derivative_algorithm( 

198 tau=self.params.model.tauu, 

199 slow=us, 

200 fast=uf, 

201 fastdot=ufdot, 

202 dt=self.params.model.dt, 

203 ) 

204 vsdot = slow_velocity_derivative_algorithm( 

205 tau=self.params.model.tauu, 

206 slow=vs, 

207 fast=vf, 

208 fastdot=vfdot, 

209 dt=self.params.model.dt, 

210 ) 

211 

212 return np.array( 

213 [xfdot, yfdot, ufdot, vfdot, xsdot, ysdot, usdot, vsdot, dt, dk, 0] 

214 ) 

215 

216 def noise(self, X_0, t) -> np.ndarray: 

217 """Return noise matrix. 

218 

219 Returns: 

220 numpy.ndarray: A diagonal matrix representing the noise matrix. 

221 The diagonal elements correspond to the independent driving Wiener 

222 processes. 

223 

224 """ 

225 noise = self.params.model.sigma 

226 return np.diag( 

227 [0.0, 0.0, noise, noise, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] 

228 ) 

229 

230 def particle_outside_grid(self, xf: float, yf: float) -> bool: 

231 """Check if particle is outside the grid. 

232 

233 Args: 

234 xf: The x-coordinate of the particle. 

235 yf: The y-coordinate of the particle. 

236 

237 Returns: 

238 Whether the particle is outside the grid. 

239 """ 

240 c0 = xf < self.params.grid.x.min 

241 c1 = xf > self.params.grid.x.max 

242 c2 = yf < self.params.grid.y.min 

243 c3 = yf > self.params.grid.y.max 

244 return any([c0, c1, c2, c3])