Coverage for physped/preprocessing/trajectories.py: 90%
58 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-01 09:28 +0000
1import logging
2from pathlib import Path
4import numpy as np
5import pandas as pd
6from omegaconf import DictConfig
7from scipy.signal import savgol_filter
9# from physped.io.readers import read_preprocessed_trajectories_from_file
10from physped.io.writers import save_trajectories
11from physped.utils.functions import (
12 cartesian_to_polar_coordinates,
13 compose_functions,
14)
16log = logging.getLogger(__name__)
19def rename_columns(df: pd.DataFrame, config: DictConfig) -> pd.DataFrame:
20 """Rename columns of a DataFrame.
22 Args:
23 df: The DataFrame to rename the columns of.
24 colnames: A dictionary with the old column names as keys and the new
25 column names as values.
27 Returns:
28 The DataFrame with the columns renamed.
29 """
30 # TODO : Use columnnames from parameters instead of renaming
31 colnames = config.params.get("colnames", {})
32 inverted_colnames = {v: k for k, v in colnames.items()}
33 df.rename(columns=inverted_colnames, inplace=True)
34 log.info("Columns renamed to %s", list(df.columns))
35 return df
38def prune_short_trajectories(
39 trajectories: pd.DataFrame, config: DictConfig
40) -> pd.DataFrame:
41 """Remove short trajectories that are most likely noise.
43 The threshold for the minimum trajectory length is set in the
44 configuration object under the key config.params.minimum_trajectory_length.
46 Args:
47 trajectories: The DataFrame containing the trajectories.
48 config: The configuration object.
50 Returns:
51 The DataFrame without short trajectories.
52 """
53 trajectories["traj_len"] = trajectories.groupby(["Pid"])["Pid"].transform(
54 "size"
55 )
56 trajectories = trajectories[
57 trajectories.traj_len > config.params.minimum_trajectory_length
58 ].copy()
59 log.info(
60 "Short trajectories with less than %s observations removed.",
61 config.params.minimum_trajectory_length,
62 )
63 return trajectories
66def add_trajectory_index(df: pd.DataFrame, config: DictConfig) -> pd.DataFrame:
67 """Add a column with the observation/step numbers of each trajectory.
69 Args:
70 df: The DataFrame to add the trajectory step/observation to.
71 config: The configuration object.
73 Returns:
74 The DataFrame with the trajectory step/observation added.
75 """
76 # pid_col, time_col = params.colnames.Pid, params.colnames.time
77 pid_col, time_col = "Pid", "time"
78 df.sort_values(by=[pid_col, time_col], inplace=True)
79 df["k"] = df.groupby(pid_col)[pid_col].transform(
80 lambda x: np.arange(x.size)
81 )
82 log.info("Trajectory step added.")
83 return df
86def compute_velocity_from_positions(
87 df: pd.DataFrame, config: DictConfig
88) -> pd.DataFrame:
89 """Add velocity to dataframe with trajectories.
91 This function calculates the velocity of each pedestrian in the input
92 DataFrame based on their position data. The velocity is calculated using
93 the Savitzky-Golay filter with a polynomial order of 1.
95 Args:
96 df: The input DataFrame with the trajectories.
98 Returns:
99 The trajectories DataFrame with velocity columns added.
100 """
101 uf = config.params.colnames.get("uf", None)
102 vf = config.params.colnames.get("vf", None)
104 if uf is not None or vf is not None:
105 log.warning(
106 "Columns for velocity found in parameters, no need to calculate "
107 "velocity."
108 )
109 return df
111 groupby = "Pid"
112 xpos = "xf"
113 ypos = "yf"
114 pos_to_vel = {"xf": "uf", "yf": "vf"}
115 window_length = config.params.velocity_window_length
116 # window_length = parameters.minimum_trajectory_length - 1
117 for direction in [xpos, ypos]:
118 df.loc[:, pos_to_vel[direction]] = df.groupby([groupby])[
119 direction
120 ].transform(
121 lambda x: savgol_filter(
122 x,
123 window_length=window_length,
124 polyorder=1,
125 deriv=1,
126 mode="interp",
127 )
128 * config.params.fps
129 )
130 log.info("Velocities 'uf' and 'vf' added.")
131 return df
134def transform_fast_velocity_to_polar_coordinates(
135 df: pd.DataFrame, config: DictConfig
136) -> pd.DataFrame:
137 """Add columns with the velocity in polar coordinates to the DataFrame.
139 Requires the columns 'uf' and 'vf' for the velocity in x and y direction,
140 respectively.
142 Args:
143 df: The trajectory DataFrame to add polar coordinates to.
145 Returns:
146 The DataFrame with additional columns 'rf' and 'thetaf' for the fast
147 velocity in polar coordinates.
148 """
149 df["rf"], df["thetaf"] = cartesian_to_polar_coordinates(df["uf"], df["vf"])
150 log.info("Velocity in polar coordinates 'rf' and 'thetaf' added")
151 return df
154def transform_slow_velocity_to_polar_coordinates(
155 df: pd.DataFrame, config: DictConfig
156) -> pd.DataFrame:
157 """Add columns with the velocity in polar coordinates to the DataFrame.
159 Requires the columns 'us' and 'vs' for the velocity in x and y direction,
160 respectively.
162 Args:
163 df: The trajectory DataFrame to add polar coordinates to.
165 Returns:
166 The DataFrame with additional columns 'rs' and 'thetas' for the slow
167 velocity in polar coordinates.
168 """
169 df["rs"], df["thetas"] = cartesian_to_polar_coordinates(df["us"], df["vs"])
170 log.info("Velocity in polar coordinates 'rs' and 'thetas' added")
171 return df
174# def add_acceleration(df: pd.DataFrame, parameters: dict) -> pd.DataFrame:
175# framerate = parameters.fps
176# groupby = "Pid"
177# xcol = "uf"
178# ycol = "vf"
179# new_col = {"uf": "axf", "vf": "ayf"}
180# window_length = parameters.velocity_window_length
181# # window_length = parameters.minimum_trajectory_length - 1
182# for direction in [xcol, ycol]:
183# df.loc[:, new_col[direction]] =
184# df.groupby([groupby])[direction].transform(
185# lambda x: savgol_filter(x, window_length=window_length,
186# polyorder=2, deriv=2, mode="interp") * framerate
187# )
188# return df
191def save_preprocessed_trajectories(
192 df: pd.DataFrame, config: DictConfig
193) -> None:
194 """Save the preprocessed trajectories to a file.
196 The file is saved if the configuration object has the key
197 'save.preprocessed_trajectories' set to True.
199 Args:
200 df: The DataFrame with the preprocessed trajectories.
201 config: The configuration object.
203 Returns:
204 The DataFrame with the preprocessed trajectories.
205 """
206 if config.save.preprocessed_trajectories:
207 log.debug(
208 "Configuration 'save.preprocessed_trajectories' is set to True."
209 )
210 save_trajectories(
211 df, Path.cwd().parent, config.filename.preprocessed_trajectories
212 )
213 return df
216def preprocess_trajectories(
217 trajectories: pd.DataFrame, config: DictConfig
218) -> pd.DataFrame:
219 """Preprocess trajectories.
221 Args:
222 trajectories: The DataFrame with the trajectories.
224 Keyword Args:
225 config: The configuration object.
227 Returns:
228 The preprocessed DataFrame with the trajectories.
229 """
230 log.info("Preprocess trajectories")
231 preprocessing_pipeline = compose_functions(
232 rename_columns,
233 prune_short_trajectories,
234 add_trajectory_index,
235 compute_velocity_from_positions,
236 transform_fast_velocity_to_polar_coordinates,
237 # save_preprocessed_trajectories,
238 )
239 return preprocessing_pipeline(trajectories, config=config)
242# get_preprocessed_trajectories = {
243# "compute": preprocess_trajectories,
244# "read": read_preprocessed_trajectories_from_file,
245# }