Coverage for physped/core/pedestrian_initializer.py: 68%
34 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
1import logging
2from pathlib import Path
4import numpy as np
5import pandas as pd
7log = logging.getLogger(__name__)
10def get_initial_dynamics(config: dict) -> np.ndarray:
11 ntrajs = config.params.simulation.ntrajs
12 match config.params.simulation.initial_dynamics.get_from:
13 case "file":
14 folderpath = Path.cwd() # / "initial_dynamics"
15 filename = (
16 folderpath / config.params.simulation.initial_dynamics.filename
17 )
18 initial_dynamics = np.load(filename)
19 if initial_dynamics.shape[0] < ntrajs:
20 raise ValueError(
21 f"Number of trajectories in "
22 f"{config.params.simulation.initial_dynamics.filename} is"
23 f"less than {ntrajs}"
24 )
25 initial_dynamics = initial_dynamics[:ntrajs, :]
26 return initial_dynamics
27 case "point":
28 point = np.array(config.params.simulation.initial_dynamics.point)
29 initial_dynamics = np.tile(point, (ntrajs, 1))
30 return initial_dynamics
31 case _:
32 raise ValueError(
33 f"Unknown initial_dynamics.get_from: "
34 f"{config.params.simulation.initial_dynamics.get_from}"
35 )
38def initialize_pedestrians(initial_dynamics: np.ndarray) -> np.ndarray:
39 """Add time, step, and Pid to the initial dynamics
41 Args:
42 initial_dynamics: The initial dynamics of the pedestrians expressed as
43 [xf, yf, uf, vf, xs, ys, us, vs]
45 Returns:
46 The initial dynamics with the time, step, and Pid added
47 [xf, yf, uf, vf, xs, ys, us, vs, t, k, Pid]
48 """
49 pedestrian_initializations = np.hstack(
50 (
51 initial_dynamics,
52 np.zeros(
53 (initial_dynamics.shape[0], 2)
54 ), # Add start_time and start_k
55 np.arange(initial_dynamics.shape[0])[:, None], # Add Pids
56 )
57 )
58 return pedestrian_initializations
61def sample_dynamics_from_trajectories(
62 trajectories: pd.DataFrame,
63 n_trajs: int,
64 state_n: int,
65) -> np.ndarray:
66 if state_n < -1:
67 raise ValueError("State n must be >= -1")
68 elif state_n == -1:
69 # Sample from random point along each path
70 states_to_sample_from = (
71 trajectories.groupby("Pid")
72 .apply(lambda x: x.sample(1))
73 .reset_index(drop=True)
74 )
75 else: # state_n >= 0:
76 # Sample from state n along each path
77 states_to_sample_from = trajectories[
78 trajectories["k"] == state_n
79 ].copy()
81 sampled_states = states_to_sample_from[
82 ["xf", "yf", "uf", "vf", "xs", "ys", "us", "vs"]
83 ].sample(n=n_trajs, replace=True)
84 log.info("Sampled %d origins from the input trajectories.", n_trajs)
85 return sampled_states.to_numpy()
88# def potential_defined_at_slow_state(
89# paths: pd.DataFrame, piecewise_potential: PiecewisePotential
90# ) -> pd.DataFrame:
91# # REQUIRED: Dataframe must have a column with the slow grid indices
92# # TODO : Move this function to another module.
93# # TODO : Change the slow_grid_indices to lists rather than tuples
94# indices = np.array(list(paths["slow_grid_indices"]))
95# potential_defined = np.where(
96# np.isnan(piecewise_potential.parametrization), False, True
97# )
98# # All free parameters must be defined
99# potential_defined = np.all(potential_defined, axis=(-2, -1))
100# paths["potential_defined"] = potential_defined[
101# indices[:, 0],
102# indices[:, 1],
103# indices[:, 2],
104# indices[:, 3],
105# indices[:, 4],
106# ]
107# return paths