Source code for physped.core.pedestrian_simulator

import logging
from pathlib import Path

import numpy as np
import pandas as pd
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

from physped.core.langevin_model import LangevinModel
from physped.core.parametrize_potential import get_grid_indices
from physped.core.pedestrian_initializer import (
    get_initial_dynamics,
    initialize_pedestrians,
)
from physped.core.piecewise_potential import PiecewisePotential
from physped.utils.functions import (
    cartesian_to_polar_coordinates,
    periodic_angular_conditions,
)

log = logging.getLogger(__name__)


[docs] def simulate_pedestrians( piecewise_potential: PiecewisePotential, config: dict, ) -> pd.DataFrame: parameters = config.params # if config.read.simulated_trajectories: # return read_simulated_trajectories_from_file(config) log.info("Simulate trajectories using the piecewise potential") initial_dynamics = get_initial_dynamics(config) origins = initialize_pedestrians(initial_dynamics) # !Make sure that the potential is defined for the states we sample # states_to_sample_from = potential_defined_at_slow_state( # states_to_sample_from, piecewise_potential # ) # states_to_sample_from = states_to_sample_from[ # states_to_sample_from["potential_defined"] # ].copy() evaluation_time = np.arange( parameters.simulation.start, parameters.simulation.end, parameters.simulation.step, ) trajectories = [] model = LangevinModel(piecewise_potential, parameters) n_frames_back = config.params.fps // 2 # Go 0.5 second back with logging_redirect_tqdm(): for starting_state in tqdm( origins[:, :11], desc="Simulating trajectories", unit="trajs", total=origins.shape[0], miniters=1, ): first_trajectory_piece = simulate_trajectory_piece( model, starting_state, evaluation_time, 0 ) pid = int(first_trajectory_piece.iloc[0]["Pid"]) trajectory_pieces = [first_trajectory_piece] while check_restarting_conditions( trajectory_pieces[-1], n_frames_back, piecewise_potential ): last_trajectory_piece = trajectory_pieces[-1] no_last_trajectory_piece = int( last_trajectory_piece.iloc[0]["piece_id"] ) restarting_state = last_trajectory_piece.iloc[-n_frames_back] restarting_time = restarting_state["t"] log.info( "Pid %s piece %s: Reverting %s frames -> t = %.2f.", int(pid), no_last_trajectory_piece, n_frames_back, restarting_time, ) last_trajectory_piece = last_trajectory_piece.iloc[ : -n_frames_back - 1 ] # strip frames from last trajectory piece trajectory_pieces[-1] = last_trajectory_piece frame_to_restart_from = int(restarting_state["k"]) new_evaluation_time = evaluation_time[frame_to_restart_from:] no_new_traj_piece = no_last_trajectory_piece + 1 new_trajectory_piece = simulate_trajectory_piece( model, restarting_state, new_evaluation_time, no_new_traj_piece, ) trajectory_pieces.append(new_trajectory_piece) trajectory_pieces = pd.concat(trajectory_pieces) trajectories.append(trajectory_pieces) trajectories = pd.concat(trajectories).dropna() trajectories["rf"], trajectories["thetaf"] = ( cartesian_to_polar_coordinates(trajectories.uf, trajectories.vf) ) trajectories["rs"], trajectories["thetas"] = ( cartesian_to_polar_coordinates(trajectories.us, trajectories.vs) ) trajectories["thetaf"] = periodic_angular_conditions( trajectories["thetaf"], config.params.grid.bins["theta"] ) trajectories["thetas"] = periodic_angular_conditions( trajectories["thetas"], config.params.grid.bins["theta"] ) # if config.save.simulated_trajectories: # log.debug( # "Configuration 'save.simulated_trajectories' is set to True." # ) # save_trajectories( # trajectories, # Path.cwd().parent, # config.filename.simulated_trajectories, # ) return trajectories
# def sample_trajectory_origins_from_trajectories(piecewise_potential: # PiecewisePotential, parameters: dict) -> np.ndarray: # ntrajs = np.min([parameters.simulation.ntrajs, parameters.input_ntrajs]) # sampled_origins = piecewise_potential.trajectory_origins.sample(n=ntrajs) # # Stacking to go from [x,y,u,v] to [x,y,u,v,xs,ys,us,vs] # sampled_origins = np.hstack((sampled_origins, sampled_origins)) # log.info("Sampled %d origins from the input trajectories.", ntrajs) # return sampled_origins
[docs] def read_simulated_trajectories_from_file(config): log.debug("Configuration 'read.simulated_trajectories' is set to True.") filepath = Path.cwd().parent / config.filename.simulated_trajectories try: simulated_trajectories = pd.read_csv(filepath) log.warning("Simulated trajectories read from file") # log.debug("Filepath %s", filepath.relative_to(config.root_dir)) return simulated_trajectories except FileNotFoundError as e: log.error("Preprocessed trajectories not found: %s", e)
[docs] def heatmap_zero_at_slow_state( piecewise_potential: PiecewisePotential, slow_state ) -> bool: """Check if the heatmap is zero at the given position. If the heatmap is zero it indicates that the spatial position was never visited by a pedestrian in the input data. Parameters: piecewise_potential: The piecewise potential object. position: The position to check. Returns: True if the heatmap is zero at the given position, False otherwise. """ heatmap = np.sum(piecewise_potential.histogram, axis=(2, 3, 4)) slow_state_index = get_grid_indices(piecewise_potential, slow_state) return heatmap[slow_state_index[0], slow_state_index[1]] == 0
[docs] def check_restarting_conditions(traj, n_frames_back, piecewise_potential): # traj is the last trajectory_piece last_trajectory_piece_too_short = len(traj) < n_frames_back if last_trajectory_piece_too_short: log.warning( "Last trajectory piece has only %s frames. Not reverting.", len(traj), ) return False trajectory_already_left_the_measurement_domain = ( heatmap_zero_at_slow_state( piecewise_potential, traj.iloc[-1][["xs", "ys", "us", "vs", "k"]] ) ) if trajectory_already_left_the_measurement_domain: log.warning( "Last trajectory piece already left the measurement domain. " "Not reverting." ) return False particle_left_the_lattice = np.all(np.isinf(traj.iloc[-1])) if particle_left_the_lattice: log.warning( "Last trajectory piece already left the lattice. Not reverting." ) return False max_restarts = 2 simulation_already_restarted_too_often = ( traj["piece_id"].iloc[0] > max_restarts ) if simulation_already_restarted_too_often: log.warning( "Simulation already restarted %s times. Not reverting.", max_restarts, ) return False return True
[docs] def simulate_trajectory_piece( model: LangevinModel, starting_state: np.ndarray, evaluation_time: np.ndarray, no_traj_piece: int, ) -> pd.DataFrame: integration_output = model.simulate(starting_state[:11], evaluation_time) columns = ["xf", "yf", "uf", "vf", "xs", "ys", "us", "vs", "t", "k", "Pid"] trajectory_piece = pd.DataFrame(integration_output, columns=columns) trajectory_piece.replace([np.inf, -np.inf], np.nan, inplace=True) trajectory_piece.dropna(inplace=True) trajectory_piece["piece_id"] = no_traj_piece return trajectory_piece