Coverage for physped/preprocessing/trajectories.py: 90%

58 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-17 19:07 +0000

1import logging 

2from pathlib import Path 

3 

4import numpy as np 

5import pandas as pd 

6from omegaconf import DictConfig 

7from scipy.signal import savgol_filter 

8 

9# from physped.io.readers import read_preprocessed_trajectories_from_file 

10from physped.io.writers import save_trajectories 

11from physped.utils.functions import ( 

12 cartesian_to_polar_coordinates, 

13 compose_functions, 

14) 

15 

16log = logging.getLogger(__name__) 

17 

18 

19def rename_columns(df: pd.DataFrame, config: DictConfig) -> pd.DataFrame: 

20 """Rename columns of a DataFrame. 

21 

22 Args: 

23 df: The DataFrame to rename the columns of. 

24 colnames: A dictionary with the old column names as keys and the new 

25 column names as values. 

26 

27 Returns: 

28 The DataFrame with the columns renamed. 

29 """ 

30 # TODO : Use columnnames from parameters instead of renaming 

31 colnames = config.params.get("colnames", {}) 

32 inverted_colnames = {v: k for k, v in colnames.items()} 

33 df.rename(columns=inverted_colnames, inplace=True) 

34 log.info("Columns renamed to %s", list(df.columns)) 

35 return df 

36 

37 

38def prune_short_trajectories( 

39 trajectories: pd.DataFrame, config: DictConfig 

40) -> pd.DataFrame: 

41 """Remove short trajectories that are most likely noise. 

42 

43 The threshold for the minimum trajectory length is set in the 

44 configuration object under the key config.params.minimum_trajectory_length. 

45 

46 Args: 

47 trajectories: The DataFrame containing the trajectories. 

48 config: The configuration object. 

49 

50 Returns: 

51 The DataFrame without short trajectories. 

52 """ 

53 trajectories["traj_len"] = trajectories.groupby(["Pid"])["Pid"].transform( 

54 "size" 

55 ) 

56 trajectories = trajectories[ 

57 trajectories.traj_len > config.params.minimum_trajectory_length 

58 ].copy() 

59 # TODO: Is a hardcopy needed? 

60 log.info( 

61 "Short trajectories with less than %s observations removed.", 

62 config.params.minimum_trajectory_length, 

63 ) 

64 return trajectories 

65 

66 

67def add_trajectory_index(df: pd.DataFrame, config: DictConfig) -> pd.DataFrame: 

68 """Add a column with the observation/step numbers of each trajectory. 

69 

70 Args: 

71 df: The DataFrame to add the trajectory step/observation to. 

72 config: The configuration object. 

73 

74 Returns: 

75 The DataFrame with the trajectory step/observation added. 

76 """ 

77 # pid_col, time_col = params.colnames.Pid, params.colnames.time 

78 pid_col, time_col = "Pid", "time" 

79 df.sort_values(by=[pid_col, time_col], inplace=True) 

80 df["k"] = df.groupby(pid_col)[pid_col].transform( 

81 lambda x: np.arange(x.size) 

82 ) 

83 log.info("Trajectory step added.") 

84 return df 

85 

86 

87def compute_velocity_from_positions( 

88 df: pd.DataFrame, config: DictConfig 

89) -> pd.DataFrame: 

90 """Add velocity to dataframe with trajectories. 

91 

92 This function calculates the velocity of each pedestrian in the input 

93 DataFrame based on their position data. The velocity is calculated using 

94 the Savitzky-Golay filter with a polynomial order of 1. 

95 

96 Args: 

97 df: The input DataFrame with the trajectories. 

98 

99 Returns: 

100 The trajectories DataFrame with velocity columns added. 

101 """ 

102 uf = config.params.colnames.get("uf", None) 

103 vf = config.params.colnames.get("vf", None) 

104 

105 if uf is not None or vf is not None: 

106 log.warning( 

107 "Columns for velocity found in parameters, no need to calculate " 

108 "velocity." 

109 ) 

110 return df 

111 

112 groupby = "Pid" 

113 xpos = "xf" 

114 ypos = "yf" 

115 pos_to_vel = {"xf": "uf", "yf": "vf"} 

116 window_length = config.params.velocity_window_length 

117 # window_length = parameters.minimum_trajectory_length - 1 

118 for direction in [xpos, ypos]: 

119 df.loc[:, pos_to_vel[direction]] = df.groupby([groupby])[ 

120 direction 

121 ].transform( 

122 lambda x: savgol_filter( 

123 x, 

124 window_length=window_length, 

125 polyorder=1, 

126 deriv=1, 

127 mode="interp", 

128 ) 

129 * config.params.fps 

130 ) 

131 log.info("Velocities 'uf' and 'vf' added.") 

132 return df 

133 

134 

135def transform_fast_velocity_to_polar_coordinates( 

136 df: pd.DataFrame, config: DictConfig 

137) -> pd.DataFrame: 

138 """Add columns with the velocity in polar coordinates to the DataFrame. 

139 

140 Requires the columns 'uf' and 'vf' for the velocity in x and y direction, 

141 respectively. 

142 

143 Args: 

144 df: The trajectory DataFrame to add polar coordinates to. 

145 

146 Returns: 

147 The DataFrame with additional columns 'rf' and 'thetaf' for the fast 

148 velocity in polar coordinates. 

149 """ 

150 df["rf"], df["thetaf"] = cartesian_to_polar_coordinates(df["uf"], df["vf"]) 

151 log.info("Velocity in polar coordinates 'rf' and 'thetaf' added") 

152 return df 

153 

154 

155def transform_slow_velocity_to_polar_coordinates( 

156 df: pd.DataFrame, config: DictConfig 

157) -> pd.DataFrame: 

158 """Add columns with the velocity in polar coordinates to the DataFrame. 

159 

160 Requires the columns 'us' and 'vs' for the velocity in x and y direction, 

161 respectively. 

162 

163 Args: 

164 df: The trajectory DataFrame to add polar coordinates to. 

165 

166 Returns: 

167 The DataFrame with additional columns 'rs' and 'thetas' for the slow 

168 velocity in polar coordinates. 

169 """ 

170 df["rs"], df["thetas"] = cartesian_to_polar_coordinates(df["us"], df["vs"]) 

171 log.info("Velocity in polar coordinates 'rs' and 'thetas' added") 

172 return df 

173 

174 

175# def add_acceleration(df: pd.DataFrame, parameters: dict) -> pd.DataFrame: 

176# framerate = parameters.fps 

177# groupby = "Pid" 

178# xcol = "uf" 

179# ycol = "vf" 

180# new_col = {"uf": "axf", "vf": "ayf"} 

181# window_length = parameters.velocity_window_length 

182# # window_length = parameters.minimum_trajectory_length - 1 

183# for direction in [xcol, ycol]: 

184# df.loc[:, new_col[direction]] = 

185# df.groupby([groupby])[direction].transform( 

186# lambda x: savgol_filter(x, window_length=window_length, 

187# polyorder=2, deriv=2, mode="interp") * framerate 

188# ) 

189# return df 

190 

191 

192def save_preprocessed_trajectories( 

193 df: pd.DataFrame, config: DictConfig 

194) -> None: 

195 """Save the preprocessed trajectories to a file. 

196 

197 The file is saved if the configuration object has the key 

198 'save.preprocessed_trajectories' set to True. 

199 

200 Args: 

201 df: The DataFrame with the preprocessed trajectories. 

202 config: The configuration object. 

203 

204 Returns: 

205 The DataFrame with the preprocessed trajectories. 

206 """ 

207 if config.save.preprocessed_trajectories: 

208 log.debug( 

209 "Configuration 'save.preprocessed_trajectories' is set to True." 

210 ) 

211 save_trajectories( 

212 df, Path.cwd().parent, config.filename.preprocessed_trajectories 

213 ) 

214 return df 

215 

216 

217def preprocess_trajectories( 

218 trajectories: pd.DataFrame, config: DictConfig 

219) -> pd.DataFrame: 

220 """Preprocess trajectories. 

221 

222 Args: 

223 trajectories: The DataFrame with the trajectories. 

224 

225 Keyword Args: 

226 config: The configuration object. 

227 

228 Returns: 

229 The preprocessed DataFrame with the trajectories. 

230 """ 

231 log.info("Preprocess trajectories") 

232 preprocessing_pipeline = compose_functions( 

233 rename_columns, 

234 prune_short_trajectories, 

235 add_trajectory_index, 

236 compute_velocity_from_positions, 

237 transform_fast_velocity_to_polar_coordinates, 

238 # save_preprocessed_trajectories, 

239 ) 

240 return preprocessing_pipeline(trajectories, config=config) 

241 

242 

243# get_preprocessed_trajectories = { 

244# "compute": preprocess_trajectories, 

245# "read": read_preprocessed_trajectories_from_file, 

246# }