Coverage for physped/preprocessing/trajectories.py: 90%

58 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-01 09:28 +0000

1import logging 

2from pathlib import Path 

3 

4import numpy as np 

5import pandas as pd 

6from omegaconf import DictConfig 

7from scipy.signal import savgol_filter 

8 

9# from physped.io.readers import read_preprocessed_trajectories_from_file 

10from physped.io.writers import save_trajectories 

11from physped.utils.functions import ( 

12 cartesian_to_polar_coordinates, 

13 compose_functions, 

14) 

15 

16log = logging.getLogger(__name__) 

17 

18 

19def rename_columns(df: pd.DataFrame, config: DictConfig) -> pd.DataFrame: 

20 """Rename columns of a DataFrame. 

21 

22 Args: 

23 df: The DataFrame to rename the columns of. 

24 colnames: A dictionary with the old column names as keys and the new 

25 column names as values. 

26 

27 Returns: 

28 The DataFrame with the columns renamed. 

29 """ 

30 # TODO : Use columnnames from parameters instead of renaming 

31 colnames = config.params.get("colnames", {}) 

32 inverted_colnames = {v: k for k, v in colnames.items()} 

33 df.rename(columns=inverted_colnames, inplace=True) 

34 log.info("Columns renamed to %s", list(df.columns)) 

35 return df 

36 

37 

38def prune_short_trajectories( 

39 trajectories: pd.DataFrame, config: DictConfig 

40) -> pd.DataFrame: 

41 """Remove short trajectories that are most likely noise. 

42 

43 The threshold for the minimum trajectory length is set in the 

44 configuration object under the key config.params.minimum_trajectory_length. 

45 

46 Args: 

47 trajectories: The DataFrame containing the trajectories. 

48 config: The configuration object. 

49 

50 Returns: 

51 The DataFrame without short trajectories. 

52 """ 

53 trajectories["traj_len"] = trajectories.groupby(["Pid"])["Pid"].transform( 

54 "size" 

55 ) 

56 trajectories = trajectories[ 

57 trajectories.traj_len > config.params.minimum_trajectory_length 

58 ].copy() 

59 log.info( 

60 "Short trajectories with less than %s observations removed.", 

61 config.params.minimum_trajectory_length, 

62 ) 

63 return trajectories 

64 

65 

66def add_trajectory_index(df: pd.DataFrame, config: DictConfig) -> pd.DataFrame: 

67 """Add a column with the observation/step numbers of each trajectory. 

68 

69 Args: 

70 df: The DataFrame to add the trajectory step/observation to. 

71 config: The configuration object. 

72 

73 Returns: 

74 The DataFrame with the trajectory step/observation added. 

75 """ 

76 # pid_col, time_col = params.colnames.Pid, params.colnames.time 

77 pid_col, time_col = "Pid", "time" 

78 df.sort_values(by=[pid_col, time_col], inplace=True) 

79 df["k"] = df.groupby(pid_col)[pid_col].transform( 

80 lambda x: np.arange(x.size) 

81 ) 

82 log.info("Trajectory step added.") 

83 return df 

84 

85 

86def compute_velocity_from_positions( 

87 df: pd.DataFrame, config: DictConfig 

88) -> pd.DataFrame: 

89 """Add velocity to dataframe with trajectories. 

90 

91 This function calculates the velocity of each pedestrian in the input 

92 DataFrame based on their position data. The velocity is calculated using 

93 the Savitzky-Golay filter with a polynomial order of 1. 

94 

95 Args: 

96 df: The input DataFrame with the trajectories. 

97 

98 Returns: 

99 The trajectories DataFrame with velocity columns added. 

100 """ 

101 uf = config.params.colnames.get("uf", None) 

102 vf = config.params.colnames.get("vf", None) 

103 

104 if uf is not None or vf is not None: 

105 log.warning( 

106 "Columns for velocity found in parameters, no need to calculate " 

107 "velocity." 

108 ) 

109 return df 

110 

111 groupby = "Pid" 

112 xpos = "xf" 

113 ypos = "yf" 

114 pos_to_vel = {"xf": "uf", "yf": "vf"} 

115 window_length = config.params.velocity_window_length 

116 # window_length = parameters.minimum_trajectory_length - 1 

117 for direction in [xpos, ypos]: 

118 df.loc[:, pos_to_vel[direction]] = df.groupby([groupby])[ 

119 direction 

120 ].transform( 

121 lambda x: savgol_filter( 

122 x, 

123 window_length=window_length, 

124 polyorder=1, 

125 deriv=1, 

126 mode="interp", 

127 ) 

128 * config.params.fps 

129 ) 

130 log.info("Velocities 'uf' and 'vf' added.") 

131 return df 

132 

133 

134def transform_fast_velocity_to_polar_coordinates( 

135 df: pd.DataFrame, config: DictConfig 

136) -> pd.DataFrame: 

137 """Add columns with the velocity in polar coordinates to the DataFrame. 

138 

139 Requires the columns 'uf' and 'vf' for the velocity in x and y direction, 

140 respectively. 

141 

142 Args: 

143 df: The trajectory DataFrame to add polar coordinates to. 

144 

145 Returns: 

146 The DataFrame with additional columns 'rf' and 'thetaf' for the fast 

147 velocity in polar coordinates. 

148 """ 

149 df["rf"], df["thetaf"] = cartesian_to_polar_coordinates(df["uf"], df["vf"]) 

150 log.info("Velocity in polar coordinates 'rf' and 'thetaf' added") 

151 return df 

152 

153 

154def transform_slow_velocity_to_polar_coordinates( 

155 df: pd.DataFrame, config: DictConfig 

156) -> pd.DataFrame: 

157 """Add columns with the velocity in polar coordinates to the DataFrame. 

158 

159 Requires the columns 'us' and 'vs' for the velocity in x and y direction, 

160 respectively. 

161 

162 Args: 

163 df: The trajectory DataFrame to add polar coordinates to. 

164 

165 Returns: 

166 The DataFrame with additional columns 'rs' and 'thetas' for the slow 

167 velocity in polar coordinates. 

168 """ 

169 df["rs"], df["thetas"] = cartesian_to_polar_coordinates(df["us"], df["vs"]) 

170 log.info("Velocity in polar coordinates 'rs' and 'thetas' added") 

171 return df 

172 

173 

174# def add_acceleration(df: pd.DataFrame, parameters: dict) -> pd.DataFrame: 

175# framerate = parameters.fps 

176# groupby = "Pid" 

177# xcol = "uf" 

178# ycol = "vf" 

179# new_col = {"uf": "axf", "vf": "ayf"} 

180# window_length = parameters.velocity_window_length 

181# # window_length = parameters.minimum_trajectory_length - 1 

182# for direction in [xcol, ycol]: 

183# df.loc[:, new_col[direction]] = 

184# df.groupby([groupby])[direction].transform( 

185# lambda x: savgol_filter(x, window_length=window_length, 

186# polyorder=2, deriv=2, mode="interp") * framerate 

187# ) 

188# return df 

189 

190 

191def save_preprocessed_trajectories( 

192 df: pd.DataFrame, config: DictConfig 

193) -> None: 

194 """Save the preprocessed trajectories to a file. 

195 

196 The file is saved if the configuration object has the key 

197 'save.preprocessed_trajectories' set to True. 

198 

199 Args: 

200 df: The DataFrame with the preprocessed trajectories. 

201 config: The configuration object. 

202 

203 Returns: 

204 The DataFrame with the preprocessed trajectories. 

205 """ 

206 if config.save.preprocessed_trajectories: 

207 log.debug( 

208 "Configuration 'save.preprocessed_trajectories' is set to True." 

209 ) 

210 save_trajectories( 

211 df, Path.cwd().parent, config.filename.preprocessed_trajectories 

212 ) 

213 return df 

214 

215 

216def preprocess_trajectories( 

217 trajectories: pd.DataFrame, config: DictConfig 

218) -> pd.DataFrame: 

219 """Preprocess trajectories. 

220 

221 Args: 

222 trajectories: The DataFrame with the trajectories. 

223 

224 Keyword Args: 

225 config: The configuration object. 

226 

227 Returns: 

228 The preprocessed DataFrame with the trajectories. 

229 """ 

230 log.info("Preprocess trajectories") 

231 preprocessing_pipeline = compose_functions( 

232 rename_columns, 

233 prune_short_trajectories, 

234 add_trajectory_index, 

235 compute_velocity_from_positions, 

236 transform_fast_velocity_to_polar_coordinates, 

237 # save_preprocessed_trajectories, 

238 ) 

239 return preprocessing_pipeline(trajectories, config=config) 

240 

241 

242# get_preprocessed_trajectories = { 

243# "compute": preprocess_trajectories, 

244# "read": read_preprocessed_trajectories_from_file, 

245# }