Coverage for physped/core/pedestrian_initializer.py: 68%

34 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-01 09:28 +0000

1import logging 

2from pathlib import Path 

3 

4import numpy as np 

5import pandas as pd 

6 

7log = logging.getLogger(__name__) 

8 

9 

10def get_initial_dynamics(config: dict) -> np.ndarray: 

11 ntrajs = config.params.simulation.ntrajs 

12 match config.params.simulation.initial_dynamics.get_from: 

13 case "file": 

14 folderpath = Path.cwd() # / "initial_dynamics" 

15 filename = ( 

16 folderpath / config.params.simulation.initial_dynamics.filename 

17 ) 

18 initial_dynamics = np.load(filename) 

19 if initial_dynamics.shape[0] < ntrajs: 

20 raise ValueError( 

21 f"Number of trajectories in " 

22 f"{config.params.simulation.initial_dynamics.filename} is" 

23 f"less than {ntrajs}" 

24 ) 

25 initial_dynamics = initial_dynamics[:ntrajs, :] 

26 return initial_dynamics 

27 case "point": 

28 point = np.array(config.params.simulation.initial_dynamics.point) 

29 initial_dynamics = np.tile(point, (ntrajs, 1)) 

30 return initial_dynamics 

31 case _: 

32 raise ValueError( 

33 f"Unknown initial_dynamics.get_from: " 

34 f"{config.params.simulation.initial_dynamics.get_from}" 

35 ) 

36 

37 

38def initialize_pedestrians(initial_dynamics: np.ndarray) -> np.ndarray: 

39 """Add time, step, and Pid to the initial dynamics 

40 

41 Args: 

42 initial_dynamics: The initial dynamics of the pedestrians expressed as 

43 [xf, yf, uf, vf, xs, ys, us, vs] 

44 

45 Returns: 

46 The initial dynamics with the time, step, and Pid added 

47 [xf, yf, uf, vf, xs, ys, us, vs, t, k, Pid] 

48 """ 

49 pedestrian_initializations = np.hstack( 

50 ( 

51 initial_dynamics, 

52 np.zeros( 

53 (initial_dynamics.shape[0], 2) 

54 ), # Add start_time and start_k 

55 np.arange(initial_dynamics.shape[0])[:, None], # Add Pids 

56 ) 

57 ) 

58 return pedestrian_initializations 

59 

60 

61def sample_dynamics_from_trajectories( 

62 trajectories: pd.DataFrame, 

63 n_trajs: int, 

64 state_n: int, 

65) -> np.ndarray: 

66 if state_n < -1: 

67 raise ValueError("State n must be >= -1") 

68 elif state_n == -1: 

69 # Sample from random point along each path 

70 states_to_sample_from = ( 

71 trajectories.groupby("Pid") 

72 .apply(lambda x: x.sample(1)) 

73 .reset_index(drop=True) 

74 ) 

75 else: # state_n >= 0: 

76 # Sample from state n along each path 

77 states_to_sample_from = trajectories[ 

78 trajectories["k"] == state_n 

79 ].copy() 

80 

81 sampled_states = states_to_sample_from[ 

82 ["xf", "yf", "uf", "vf", "xs", "ys", "us", "vs"] 

83 ].sample(n=n_trajs, replace=True) 

84 log.info("Sampled %d origins from the input trajectories.", n_trajs) 

85 return sampled_states.to_numpy() 

86 

87 

88# def potential_defined_at_slow_state( 

89# paths: pd.DataFrame, piecewise_potential: PiecewisePotential 

90# ) -> pd.DataFrame: 

91# # REQUIRED: Dataframe must have a column with the slow grid indices 

92# # TODO : Move this function to another module. 

93# # TODO : Change the slow_grid_indices to lists rather than tuples 

94# indices = np.array(list(paths["slow_grid_indices"])) 

95# potential_defined = np.where( 

96# np.isnan(piecewise_potential.parametrization), False, True 

97# ) 

98# # All free parameters must be defined 

99# potential_defined = np.all(potential_defined, axis=(-2, -1)) 

100# paths["potential_defined"] = potential_defined[ 

101# indices[:, 0], 

102# indices[:, 1], 

103# indices[:, 2], 

104# indices[:, 3], 

105# indices[:, 4], 

106# ] 

107# return paths