Coverage for physped/preprocessing/trajectories.py: 90%
58 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 19:07 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-17 19:07 +0000
1import logging
2from pathlib import Path
4import numpy as np
5import pandas as pd
6from omegaconf import DictConfig
7from scipy.signal import savgol_filter
9# from physped.io.readers import read_preprocessed_trajectories_from_file
10from physped.io.writers import save_trajectories
11from physped.utils.functions import (
12 cartesian_to_polar_coordinates,
13 compose_functions,
14)
16log = logging.getLogger(__name__)
19def rename_columns(df: pd.DataFrame, config: DictConfig) -> pd.DataFrame:
20 """Rename columns of a DataFrame.
22 Args:
23 df: The DataFrame to rename the columns of.
24 colnames: A dictionary with the old column names as keys and the new
25 column names as values.
27 Returns:
28 The DataFrame with the columns renamed.
29 """
30 # TODO : Use columnnames from parameters instead of renaming
31 colnames = config.params.get("colnames", {})
32 inverted_colnames = {v: k for k, v in colnames.items()}
33 df.rename(columns=inverted_colnames, inplace=True)
34 log.info("Columns renamed to %s", list(df.columns))
35 return df
38def prune_short_trajectories(
39 trajectories: pd.DataFrame, config: DictConfig
40) -> pd.DataFrame:
41 """Remove short trajectories that are most likely noise.
43 The threshold for the minimum trajectory length is set in the
44 configuration object under the key config.params.minimum_trajectory_length.
46 Args:
47 trajectories: The DataFrame containing the trajectories.
48 config: The configuration object.
50 Returns:
51 The DataFrame without short trajectories.
52 """
53 trajectories["traj_len"] = trajectories.groupby(["Pid"])["Pid"].transform(
54 "size"
55 )
56 trajectories = trajectories[
57 trajectories.traj_len > config.params.minimum_trajectory_length
58 ].copy()
59 # TODO: Is a hardcopy needed?
60 log.info(
61 "Short trajectories with less than %s observations removed.",
62 config.params.minimum_trajectory_length,
63 )
64 return trajectories
67def add_trajectory_index(df: pd.DataFrame, config: DictConfig) -> pd.DataFrame:
68 """Add a column with the observation/step numbers of each trajectory.
70 Args:
71 df: The DataFrame to add the trajectory step/observation to.
72 config: The configuration object.
74 Returns:
75 The DataFrame with the trajectory step/observation added.
76 """
77 # pid_col, time_col = params.colnames.Pid, params.colnames.time
78 pid_col, time_col = "Pid", "time"
79 df.sort_values(by=[pid_col, time_col], inplace=True)
80 df["k"] = df.groupby(pid_col)[pid_col].transform(
81 lambda x: np.arange(x.size)
82 )
83 log.info("Trajectory step added.")
84 return df
87def compute_velocity_from_positions(
88 df: pd.DataFrame, config: DictConfig
89) -> pd.DataFrame:
90 """Add velocity to dataframe with trajectories.
92 This function calculates the velocity of each pedestrian in the input
93 DataFrame based on their position data. The velocity is calculated using
94 the Savitzky-Golay filter with a polynomial order of 1.
96 Args:
97 df: The input DataFrame with the trajectories.
99 Returns:
100 The trajectories DataFrame with velocity columns added.
101 """
102 uf = config.params.colnames.get("uf", None)
103 vf = config.params.colnames.get("vf", None)
105 if uf is not None or vf is not None:
106 log.warning(
107 "Columns for velocity found in parameters, no need to calculate "
108 "velocity."
109 )
110 return df
112 groupby = "Pid"
113 xpos = "xf"
114 ypos = "yf"
115 pos_to_vel = {"xf": "uf", "yf": "vf"}
116 window_length = config.params.velocity_window_length
117 # window_length = parameters.minimum_trajectory_length - 1
118 for direction in [xpos, ypos]:
119 df.loc[:, pos_to_vel[direction]] = df.groupby([groupby])[
120 direction
121 ].transform(
122 lambda x: savgol_filter(
123 x,
124 window_length=window_length,
125 polyorder=1,
126 deriv=1,
127 mode="interp",
128 )
129 * config.params.fps
130 )
131 log.info("Velocities 'uf' and 'vf' added.")
132 return df
135def transform_fast_velocity_to_polar_coordinates(
136 df: pd.DataFrame, config: DictConfig
137) -> pd.DataFrame:
138 """Add columns with the velocity in polar coordinates to the DataFrame.
140 Requires the columns 'uf' and 'vf' for the velocity in x and y direction,
141 respectively.
143 Args:
144 df: The trajectory DataFrame to add polar coordinates to.
146 Returns:
147 The DataFrame with additional columns 'rf' and 'thetaf' for the fast
148 velocity in polar coordinates.
149 """
150 df["rf"], df["thetaf"] = cartesian_to_polar_coordinates(df["uf"], df["vf"])
151 log.info("Velocity in polar coordinates 'rf' and 'thetaf' added")
152 return df
155def transform_slow_velocity_to_polar_coordinates(
156 df: pd.DataFrame, config: DictConfig
157) -> pd.DataFrame:
158 """Add columns with the velocity in polar coordinates to the DataFrame.
160 Requires the columns 'us' and 'vs' for the velocity in x and y direction,
161 respectively.
163 Args:
164 df: The trajectory DataFrame to add polar coordinates to.
166 Returns:
167 The DataFrame with additional columns 'rs' and 'thetas' for the slow
168 velocity in polar coordinates.
169 """
170 df["rs"], df["thetas"] = cartesian_to_polar_coordinates(df["us"], df["vs"])
171 log.info("Velocity in polar coordinates 'rs' and 'thetas' added")
172 return df
175# def add_acceleration(df: pd.DataFrame, parameters: dict) -> pd.DataFrame:
176# framerate = parameters.fps
177# groupby = "Pid"
178# xcol = "uf"
179# ycol = "vf"
180# new_col = {"uf": "axf", "vf": "ayf"}
181# window_length = parameters.velocity_window_length
182# # window_length = parameters.minimum_trajectory_length - 1
183# for direction in [xcol, ycol]:
184# df.loc[:, new_col[direction]] =
185# df.groupby([groupby])[direction].transform(
186# lambda x: savgol_filter(x, window_length=window_length,
187# polyorder=2, deriv=2, mode="interp") * framerate
188# )
189# return df
192def save_preprocessed_trajectories(
193 df: pd.DataFrame, config: DictConfig
194) -> None:
195 """Save the preprocessed trajectories to a file.
197 The file is saved if the configuration object has the key
198 'save.preprocessed_trajectories' set to True.
200 Args:
201 df: The DataFrame with the preprocessed trajectories.
202 config: The configuration object.
204 Returns:
205 The DataFrame with the preprocessed trajectories.
206 """
207 if config.save.preprocessed_trajectories:
208 log.debug(
209 "Configuration 'save.preprocessed_trajectories' is set to True."
210 )
211 save_trajectories(
212 df, Path.cwd().parent, config.filename.preprocessed_trajectories
213 )
214 return df
217def preprocess_trajectories(
218 trajectories: pd.DataFrame, config: DictConfig
219) -> pd.DataFrame:
220 """Preprocess trajectories.
222 Args:
223 trajectories: The DataFrame with the trajectories.
225 Keyword Args:
226 config: The configuration object.
228 Returns:
229 The preprocessed DataFrame with the trajectories.
230 """
231 log.info("Preprocess trajectories")
232 preprocessing_pipeline = compose_functions(
233 rename_columns,
234 prune_short_trajectories,
235 add_trajectory_index,
236 compute_velocity_from_positions,
237 transform_fast_velocity_to_polar_coordinates,
238 # save_preprocessed_trajectories,
239 )
240 return preprocessing_pipeline(trajectories, config=config)
243# get_preprocessed_trajectories = {
244# "compute": preprocess_trajectories,
245# "read": read_preprocessed_trajectories_from_file,
246# }