Source code for physped.io.readers

"""Data readers for the physics based pedestrian modeling code."""

import glob
import io
import logging
import pickle
import zipfile
from fnmatch import fnmatch
from io import BytesIO
from pathlib import Path
from typing import List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
from omegaconf import DictConfig
from PIL import Image
from tqdm import tqdm

from physped.core.piecewise_potential import PiecewisePotential

log = logging.getLogger(__name__)


[docs] def read_trajectories_from_file(filepath: Path) -> pd.DataFrame: """Read preprocessed trajectories from a csv file. Mainly used to read intermediate outputs. Args: config: The configuration parameters. Returns: The preprocessed trajectory dataset. """ trajectories = pd.read_csv(filepath) log.warning("Trajectories read from file.") return trajectories
[docs] def read_piecewise_potential(config: DictConfig) -> PiecewisePotential: filepath = Path.cwd().parent / config.filename.piecewise_potential if config.read.piecewise_potential: log.debug( "Configuration 'read.simulated_trajectories' is set to True." ) try: piecewise_potential = read_piecewise_potential_from_file(filepath) log.warning("Piecewise potential read from file") # log.debug("Filepath %s", filepath.relative_to(config.root_dir)) return piecewise_potential except FileNotFoundError as e: log.error("Piecewise potential not found: %s", e)
[docs] def read_piecewise_potential_from_file(filepath: Path) -> PiecewisePotential: """Read piecewise potential from file. Args: filepath: Path to the file containing the piecewise potential. Returns: The piecewise potential. """ with open(filepath, "rb") as file: piecewise_potential = pickle.load(file) return piecewise_potential
[docs] def read_narrow_corridor_paths_local( config: DictConfig, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Read the narrow corridor paths archive from a local zip. Args: config: The configuration parameters. Returns: A tuple containing two DataFrames: - df_ltr: Paths of pedestrians walking from left to right. - df_rtl: Paths of pedestrians walking from right to left. """ trajectory_data_dir = Path(config.trajectory_data_dir) log.info("Start reading narrow corridor data set.") archive = zipfile.ZipFile(trajectory_data_dir / "data.zip") with archive.open("left-to-right.ssv") as paths_ltr: paths_ltr = paths_ltr.read().decode("utf-8") df_ltr = pd.read_csv(io.StringIO(paths_ltr), sep=" ") with archive.open("right-to-left.ssv") as paths_rtl: paths_rtl = paths_rtl.read().decode("utf-8") df_rtl = pd.read_csv(io.StringIO(paths_rtl), sep=" ") return df_ltr, df_rtl
[docs] def read_narrow_corridor_paths_4tu( config: DictConfig, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Read the narrow corridor paths archive from 4TU remote repository. Args: config: The configuration parameters. Returns: A tuple containing two DataFrames: - df_ltr: Paths of pedestrians walking from left to right. - df_rtl: Paths of pedestrians walking from right to left. """ link = ( "https://data.4tu.nl/ndownloader/items/" "b8e30f8c-3931-4604-842a-77c7fb8ac3fc/versions/1" ) bytestring = requests.get(link, timeout=10) with zipfile.ZipFile(io.BytesIO(bytestring.content), "r") as outerzip: with zipfile.ZipFile(outerzip.open("data.zip")) as innerzip: with innerzip.open("left-to-right.ssv") as paths_ltr: paths_ltr = paths_ltr.read().decode("utf-8") with innerzip.open("right-to-left.ssv") as paths_rtl: paths_rtl = paths_rtl.read().decode("utf-8") df_ltr = pd.read_csv(io.StringIO(paths_ltr), sep=" ") df_rtl = pd.read_csv(io.StringIO(paths_rtl), sep=" ") return df_ltr, df_rtl
narrow_corridor_path_reader = { "local": read_narrow_corridor_paths_local, "4tu": read_narrow_corridor_paths_4tu, }
[docs] def read_intersecting_paths(config: DictConfig) -> pd.DataFrame: """Read the intersecting paths data set. The intersecting paths dataset is created by combining the left-to-right and right-to-left paths from the narrow corridor dataset. The paths from the right-to-left dataset are rotated by 90 degrees to create intersecting paths. Args: config: The configuration parameters. Returns: The trajectory dataset with intersecting paths. """ data_source = config.params.data_source df_ltr, df_rtl = narrow_corridor_path_reader[data_source](config) df_ltr["X_SG"] = df_ltr["X_SG"] + 0.1 df_ltr["Y_SG"] = df_ltr["Y_SG"] - 0.05 df_rtl["X_SG"] = df_rtl["X_SG"] + 0.1 df_rtl["Y_SG"] = df_rtl["Y_SG"] - 0.05 # swap x and y coordinates to rotate by 90 degrees df_rtl.rename( columns={ "X": "Y", "Y": "X", "X_SG": "Y_SG", "Y_SG": "X_SG", "U_SG": "V_SG", "V_SG": "U_SG", }, inplace=True, ) df = pd.concat([df_ltr, df_rtl], ignore_index=True) log.info("Finished reading single paths data set.") return df
[docs] def read_narrow_corridor_paths(config: DictConfig) -> pd.DataFrame: """Read the narrow corridor data set. The trajectories are read from local or remote sources based on the configuration. Args: config: The configuration parameters. Returns: The trajectory dataset with single paths. """ data_source = config.params.data_source df_ltr, df_rtl = narrow_corridor_path_reader[data_source](config) df = pd.concat([df_ltr, df_rtl], ignore_index=True) df["X_SG"] = df["X_SG"] + 0.1 df["Y_SG"] = df["Y_SG"] - 0.05 # Only keep the columns that are needed df = df[["Pid", "Rstep", "X_SG", "Y_SG"]] log.info("Finished reading single paths data set.") return df
[docs] def read_wide_corridor_paths(config: DictConfig) -> pd.DataFrame: """Read the wide corridor data set from a local file. Args: config: The configuration parameters. Returns: The trajectory dataset with paths in the wide corridor. """ trajectory_data_dir = ( Path(config.root_dir).parent / config.trajectory_data_dir ) file_path = trajectory_data_dir / "df_single_pedestrians_small.h5" df = pd.read_hdf(file_path) df.rename( columns={"X_SG": "xf", "Y_SG": "yf", "U_SG": "uf", "V_SG": "vf"}, inplace=True, ) df["xf"] = df["xf"] - 0.4 df["yf"] = df["yf"] + 0.3 df["Pid"] = df.groupby(["Pid", "day_id"]).ngroup() df.reset_index(inplace=True) df = df.query("Umean>0.5").loc[abs(df.X0 - df.X1) > 2] df = df.groupby("Pid").filter(lambda x: max(x.uf) < 3.5) return df
[docs] def read_curved_paths_synthetic(config: DictConfig) -> pd.DataFrame: """Read the synthetic curved paths data set. Args: config: The configuration parameters. Returns: The synthetic curved paths dataset. """ root_dir = Path(config.root_dir).parent trajectory_data_dir = Path(config.trajectory_data_dir) file_path = ( root_dir / trajectory_data_dir / "artificial_measurements_ellipse.parquet" ) df = pd.read_parquet(file_path) df = df.rename(columns={"x": "xf", "y": "yf", "xdot": "uf", "ydot": "vf"}) return df
# def filter_trajectory(df, cutoff=0.16, order=4): # b, a = signal.butter(order, cutoff, "low") # df = df.sort_values(["particle", "time"]) # df = df.groupby("particle").filter(lambda x: len(x) > 52) # f_df = df.groupby(df["particle"]).apply( # lambda x: pd.DataFrame( # signal.filtfilt(b, a, x[["x", "y"]].values, axis=0))) # df[["x", "y"]] = f_df.set_index(df.index) # return df # def savgol_smoothing(df: pd.DataFrame, smooth_colname, # groupby_colname="Pid"): # slow = df.groupby(groupby_colname)[smooth_colname].transform( # lambda x: signal.savgol_filter(x, window_length=9, # polyorder=1, deriv=0, mode="interp") # ) # return slow # def read_curved_paths(config) -> pd.DataFrame: # # Trajectories recorded during experiment 1 # trajectory_data_dir = Path(config.trajectory_data_dir) # trajs = pd.read_csv(trajectory_data_dir / "linecross-1.csv") # # trajs = filter_trajectory(trajs) # pid_column = "particle" # time_column = "time" # conversion_X = 2.30405921919033 # conversion_Y = 2.35579871138595 # trajs = trajs[trajs.frame > 380].copy() # trajs["x"] = trajs["x"] - np.mean(trajs["x"]) - 30 # trajs["y"] = trajs["y"] - np.mean(trajs["y"]) + 35 # trajs1 = trajs.copy() # # Trajectories recorded during experiment 2 # trajs = pd.read_csv(trajectory_data_dir / "linecross-2.csv") # trajs = filter_trajectory(trajs) # trajs = trajs[trajs.frame > 380].copy() # trajs["x"] = trajs["x"] - np.mean(trajs["x"]) + 17 # trajs["y"] = trajs["y"] - np.mean(trajs["y"]) + 33 # trajs[pid_column] += np.max(trajs1[pid_column]) # trajs2 = trajs.copy() # trajs = pd.concat([trajs1, trajs2]) # trajs["x"] = trajs["x"] * conversion_X / 100 # trajs["y"] = trajs["y"] * conversion_Y / 100 # # trajs["v_x_m"] = trajs["v_x_m"].replace(-99, np.nan).interpolate() # # trajs["v_y_m"] = trajs["v_y_m"].replace(-99, np.nan).interpolate() # trajs["traj_len"] = # trajs.groupby([pid_column])[pid_column].transform("size") # trajs = trajs[trajs.traj_len > 10].copy() # trajs.sort_values(by=[pid_column, time_column], inplace=True) # trajs["k"] = trajs.groupby(pid_column)[pid_column].transform( # lambda x: np.arange(x.size)) # # trajs["kp"] = trajs["k"] // 320 # # trajs["new_pid"] = trajs.apply(lambda x: # int(f"{x[pid_column]:06}{x['kp']:04}"), axis=1) # # trajs["new_pid"] = trajs[pid_column] * 100000 + trajs["kp"] + 100 # trajs["x"] = savgol_smoothing(trajs, "x", pid_column) # # Smooth the noisy trajectories to get reasonable velocities # trajs["y"] = savgol_smoothing(trajs, "y", pid_column) # return trajs
[docs] def read_ehv_pf34_paths_geert(config: DictConfig) -> pd.DataFrame: """Read the Eindhoven platform 3-4 paths data set from Geert. Args: config: The configuration parameters. Returns: The trajectory dataset with Eindhoven platform 3-4 paths """ root_dir = Path(config.root_dir).parent trajectory_data_dir = Path(config.trajectory_data_dir) file_path = ( root_dir / trajectory_data_dir / "trajectories_EHV_platform_2_1_refined.parquet" ) df = pd.read_parquet(file_path) df = df[["date_time_utc", "Pid", "xf", "yf"]] # Rotate the domain df.rename( {"xf": "yf", "yf": "xf", "uf": "vf", "vf": "uf"}, axis=1, inplace=True ) return df
[docs] def filter_part_of_the_domain(df, xmin, xmax, xcol="x_pos", ycol="y_pos"): df = df[df[xcol] > xmin].copy() df = df[df[ycol] < xmax].copy() return df
[docs] def read_ehv_pf34_paths_local(config: DictConfig) -> pd.DataFrame: """Read the Eindhoven platform 3-4 paths data set from a local file. Args: config: The configuration parameters. Returns: The trajectory dataset with Eindhoven platform 3-4 paths """ trajectory_data_dir = Path(config.trajectory_data_dir) glob_string = f"{str(trajectory_data_dir)}/ehv_pf34/*.parquet" filelist = glob.glob(glob_string) df_list = [] for file in tqdm(filelist, ascii=True): df = pd.read_parquet(file) df_list.append(df) df = pd.concat(df_list) # Rotate the domain by 90 degrees df.rename({"x_pos": "y_pos", "y_pos": "x_pos"}, axis=1, inplace=True) # Convert spatial coordinates from milimeters to meters df["x_pos"] /= 1000 df["y_pos"] /= 1000 df = filter_part_of_the_domain(df, xmin=50, xmax=70) return df
[docs] def get_zenodo_data(url: str) -> BytesIO: """Download data from Zenodo URL into memory""" response = requests.get(url, timeout=10) response.raise_for_status() return BytesIO(response.content)
[docs] def list_zenodo_files(config: DictConfig, pattern: str = None) -> List[dict]: """List all files in a Zenodo record, optionally filtered by pattern Args: config: The configuration parameters. Returns: List of file information dictionaries """ zenodo_repo_url = config.params.data_url data_record_id = config.params.data_record_id url = f"{zenodo_repo_url}/{data_record_id}" response = requests.get(url, timeout=10) response.raise_for_status() files = response.json()["files"] if pattern: files = [f for f in files if fnmatch(f["key"], pattern)] return files
[docs] def read_parquet_from_zenodo(config: DictConfig) -> pd.DataFrame: """Read parquet file from Zenodo using Pandas Args: config: The configuration parameters. Returns: DataFrame from Pandas """ # Get file information files = list_zenodo_files(config, "*.parquet") if not files: raise ValueError( f"No matching files found in record {config.params.record_id}" ) # Use first matching file if filename not specified file_info = files[0] # TODO Reader for other files in the zenodo repo file_url = file_info["links"]["self"] # Get data into memory data = get_zenodo_data(file_url) # Read with specified library return pd.read_parquet(data, engine="pyarrow")
[docs] def read_ehv_pf34_paths_zenodo(config: DictConfig) -> pd.DataFrame: """Read the Eindhoven platform 3-4 paths data set from a local file. Args: config: The configuration parameters. Returns: The trajectory dataset with Eindhoven platform 3-4 paths """ df = read_parquet_from_zenodo(config) df = df.reset_index(drop=True).head( 10000000 ) # TODO: Fix memory issues in a better way # Convert spatial coordinates from milimeters to meters xcol = config.params.colnames.xf ycol = config.params.colnames.yf df[xcol] /= 1000 df[ycol] /= 1000 df = filter_part_of_the_domain(df, xmin=50, xmax=70, xcol=xcol, ycol=ycol) return df
ehv_pf34_path_reader = { "geert": read_ehv_pf34_paths_geert, "local": read_ehv_pf34_paths_local, "zenodo": read_ehv_pf34_paths_zenodo, }
[docs] def read_eindhoven_pf34_paths(config: DictConfig) -> pd.DataFrame: """Read the Eindhoven platform 3-4 paths data set. Args: config: The configuration parameters. Returns: The trajectory dataset with Eindhoven platform 3-4 paths. """ path_reader = ehv_pf34_path_reader[config.params.data_source] df = path_reader(config) return df
[docs] def read_asdz_pf34_paths_local(config: DictConfig) -> pd.DataFrame: trajectory_data_dir = Path(config.trajectory_data_dir) file_path = ( trajectory_data_dir / "Amsterdam Zuid - platform 3-4 - set1.csv" ) df = pd.read_csv(file_path) return df
[docs] def read_asdz_pf34_paths_4tu(config: DictConfig) -> pd.DataFrame: link = ( "https://data.4tu.nl/file/7d78a5e3-6142-49fe-be03-e4c707322863/" "40ea5cd9-95dc-4e3c-8760-7f4dd543eae7" ) bytestring = requests.get(link, timeout=10) with zipfile.ZipFile(io.BytesIO(bytestring.content), "r") as zipped_file: with zipped_file.open( "Amsterdam Zuid - platform 3-4 - set1.csv" ) as paths: df = pd.read_csv(paths) return df
asdz_pf34_path_reader = { "local": read_asdz_pf34_paths_local, "4tu": read_asdz_pf34_paths_4tu, }
[docs] def read_asdz_pf34_paths(config: DictConfig) -> pd.DataFrame: df = asdz_pf34_path_reader[config.params.data_source](config) # Convert spatial coordinates from milimeters to meters df["x_pos"] /= 1000 df["y_pos"] /= 1000 return df
[docs] def read_utrecht_pf5_paths_4tu(config: DictConfig): """Read the Utrecht Centraal platform 5 paths data set from 4TU. Args: config: The configuration parameters. Returns: The trajectory dataset with Utrecht Centraal platform 5 paths """ link = ( "https://data.4tu.nl/file/d4d548c6-d198-49b3-986c-e22319970a5e/" "a58041fb-0318-4bee-9b2c-934bd8e5df83" ) bytestring = requests.get(link, timeout=10) with zipfile.ZipFile(io.BytesIO(bytestring.content), "r") as zipped_file: with zipped_file.open( "Utrecht Centraal - platform 5 - set99.csv" ) as paths: df = pd.read_csv(paths) return df
[docs] def read_utrecht_pf5_paths_local(config: DictConfig): """Read the Utrecht Centraal platform 5 paths data set from a local file. Args: config: The configuration parameters. Returns: The trajectory dataset with Utrecht Centraal platform 5 paths. """ file_list = glob.glob(config.trajectory_data_dir + "/Utrecht*.csv") file_path = file_list[0] return pd.read_csv(file_path)
utrecht_pf5_path_reader = { "local": read_utrecht_pf5_paths_local, "4tu": read_utrecht_pf5_paths_4tu, }
[docs] def read_utrecht_pf5_paths(config: DictConfig) -> pd.DataFrame: """Read the Utrecht Centraal platform 5 paths data set. The trajectories are read from local or remote sources based on the configuration. The spatial coordinates of the trajectories are converted from milimeters to meters. Args: config: The configuration parameters. Returns: The trajectory dataset with Utrecht Centraal platform 5 paths. """ path_reader = utrecht_pf5_path_reader[config.params.data_source] df = path_reader(config) # Only keep the columns that are needed df = df[["x_pos", "y_pos", "tracked_object", "timestampms"]] # Convert spatial coordinates from milimeters to meters df["x_pos"] /= 1000 df["y_pos"] /= 1000 return df
[docs] def read_asdz_pf12_paths_4tu(config: DictConfig) -> pd.DataFrame: """Read the Amsterdam Zuid platform 1-2 paths data set from 4TU. Args: config: The configuration parameters. Returns: The trajectory dataset with Amsterdam Zuid platform 1-2 paths """ link = ( "https://data.4tu.nl/file/af4ef093-69ef-4e1c-8fbc-c40c447c618c/" "ca88bfc5-5a79-496a-8c90-433fa40929b9" ) bytestring = requests.get(link, timeout=10) with zipfile.ZipFile(io.BytesIO(bytestring.content), "r") as zipped_file: with zipped_file.open( "Amsterdam Zuid - platform 1-2 - set10.csv" ) as paths: df = pd.read_csv(paths) return df
[docs] def read_asdz_pf12_paths_local(config: DictConfig) -> pd.DataFrame: """Read the Amsterdam Zuid platform 1-2 paths data set from a local file. Args: config: The configuration parameters. Returns: The trajectory dataset with Amsterdam Zuid platform 1-2 paths """ # TODO Change path string to pathlib path file_list = glob.glob( config.trajectory_data_dir + "/Amsterdam*Zuid*1-2*.csv" ) file_path = file_list[0] return pd.read_csv(file_path)
asdz_pf12_path_reader = { "local": read_asdz_pf12_paths_local, "4tu": read_asdz_pf12_paths_4tu, }
[docs] def read_asdz_pf12_paths(config: DictConfig) -> pd.DataFrame: """Read the Amsterdam Zuid platform 1-2 paths data set. The trajectories are read from local or remote sources based on the configuration. The spatial coordinates of the trajectories are converted from milimeters to meters. Args: config: The configuration parameters. Returns: The trajectory dataset with Amsterdam Zuid platform 1-2 paths. """ df = asdz_pf12_path_reader[config.params.data_source](config) # Convert spatial coordinates from milimeters to meters df["x_pos"] /= 1000 df["y_pos"] /= 1000 return df
trajectory_reader = { "narrow_corridor": read_narrow_corridor_paths, "wide_corridor": read_wide_corridor_paths, "intersecting_paths": read_intersecting_paths, "curved_paths_synthetic": read_curved_paths_synthetic, "eindhoven_pf34": read_eindhoven_pf34_paths, "asdz_pf34": read_asdz_pf34_paths, "utrecht_pf5": read_utrecht_pf5_paths, "asdz_pf12": read_asdz_pf12_paths, }
[docs] def read_trajectories(config) -> pd.DataFrame: log.info("Reading trajectories") env_name = config.params.env_name trajectories = trajectory_reader[env_name](config) return trajectories
[docs] def get_background_image_local(config: DictConfig) -> Image.Image: """Read the background image from a local file. Args: config: The configuration parameters. Returns: The background image as a PIL Image object. """ image = plt.imread( Path(config.root_dir).parent / config.params.background.imgpath ) return image
[docs] def get_background_image_from_remote_zip(config: DictConfig) -> Image.Image: """Read the background image from a remote archive. Args: config: The configuration parameters. Returns: The background image as a PIL Image object. """ link = config.params.background.img_link_4tu bytestring = requests.get(link, timeout=10) archive = zipfile.ZipFile(io.BytesIO(bytestring.content), "r") background_name = [x for x in archive.namelist() if x.endswith(".png")][0] with archive.open(background_name) as contents: image_data = contents.read() image_array = np.frombuffer(image_data, dtype=np.uint8) image = plt.imread(io.BytesIO(image_array)) return image
[docs] def get_background_image_from_zenodo( config: DictConfig, filename: str = None ) -> Image.Image: """Read image file from Zenodo using PIL Args: config: The configuration parameters. filename: Name of file to read. If None, will read first image file found Returns: PIL Image object """ # Get file information pattern = ( filename if filename else "*.png" ) # Can be extended to support other formats files = list_zenodo_files(config, pattern) if not files: raise ValueError( "No matching image files found in record " f"{config.params.data_record_id}" ) # Use first matching file if filename not specified file_info = files[0] file_url = file_info["links"]["self"] # Get data into memory data = get_zenodo_data(file_url) # Read image using PIL return Image.open(data)
read_background_image = { "local": get_background_image_local, "4tu": get_background_image_from_remote_zip, "zenodo": get_background_image_from_zenodo, }