import glob
import logging
import shutil
from pathlib import Path
import hydra
import matplotlib.pyplot as plt
from physped.core.lattice_selection import evaluate_selection_range
from physped.core.parametrize_potential import (
learn_potential_from_trajectories,
)
from physped.core.pedestrian_simulator import simulate_pedestrians
from physped.core.slow_dynamics import compute_slow_dynamics
from physped.io.readers import trajectory_reader
from physped.io.writers import save_piecewise_potential
from physped.preprocessing.trajectories import preprocess_trajectories
from physped.utils.config_utils import (
log_configuration,
register_new_resolvers,
)
from physped.visualization.plot_discrete_grid import plot_discrete_grid
from physped.visualization.plot_histograms import (
compute_joint_kl_divergence,
create_all_histograms,
plot_multiple_histograms,
save_joint_kl_divergence_to_file,
)
from physped.visualization.plot_potential_at_slow_index import (
plot_potential_at_slow_index,
)
from physped.visualization.plot_trajectories import plot_trajectories
log = logging.getLogger(__name__)
[docs]
@hydra.main(version_base=None, config_path="conf", config_name="config")
def model(config):
env_name = config.params.env_name
plt.style.use(Path(config.root_dir) / config.plot_style)
log_configuration(config)
log.info("READING TRAJECTORIES")
trajectories = trajectory_reader[env_name](config)
log.info("PREPROCESSING TRAJECTORIES")
preprocessed_trajectories = preprocess_trajectories(
trajectories, config=config
)
# TODO Check if input_ntrajs is still needed
# config.params.input_ntrajs = len(preprocessed_trajectories.Pid.unique())
log.info("PROCESSING SLOW MODES")
preprocessed_trajectories = compute_slow_dynamics(
preprocessed_trajectories, config=config
)
log.info("LEARNING POTENTIAL")
piecewise_potential = learn_potential_from_trajectories(
preprocessed_trajectories, config
)
if config.save.piecewise_potential and not config.read.piecewise_potential:
save_piecewise_potential(
piecewise_potential,
Path.cwd().parent,
config.filename.piecewise_potential,
)
log.info("SIMULATING TRAJECTORIES")
simulated_trajectories = simulate_pedestrians(
piecewise_potential, config, preprocessed_trajectories
)
log.info("PLOTTING FIGURES")
# * Optional plotting of preprocessed trajectories
if config.plot.preprocessed_trajectories:
log.info("Plot preprocessed trajectories.")
log.debug(
"Configuration 'plot.preprocessed_trajectories' is set to True."
)
plot_trajectories(preprocessed_trajectories, config, "recorded")
plot_trajectories(
preprocessed_trajectories, config, "recorded", traj_type="s"
)
else:
log.warning(
"Configuration 'plot.preprocessed_trajectories' is set to False."
)
# * Optional plotting of simulated trajectories
if config.plot.simulated_trajectories:
log.info("Plot simulated trajectories")
log.debug(
"Configuration 'plot.simulated_trajectories' is set to True."
)
config.params.trajectory_plot.plot_intended_path = False
plot_trajectories(simulated_trajectories, config, "simulated")
plot_trajectories(
simulated_trajectories, config, "simulated", traj_type="s"
)
else:
log.warning(
"Configuration 'plot.simulated_trajectories' is set to False."
)
# * Optional plotting of probability distributions
if config.plot.histograms:
log.info("Plot probability distribution comparison.")
log.debug("Configuration 'plot.histograms' is set to True.")
observables = ["xf", "yf", "uf", "vf"]
config.params.simulation.ntrajs = len(
simulated_trajectories.Pid.unique()
)
histograms = create_all_histograms(
preprocessed_trajectories, simulated_trajectories, config
)
joint_kl_divergence = compute_joint_kl_divergence(
piecewise_potential, simulated_trajectories
)
save_joint_kl_divergence_to_file(joint_kl_divergence, config)
plot_multiple_histograms(observables, histograms, "PDF", config)
else:
log.warning(
"Configuration 'plot.simulated_trajectories' is set to False."
)
config = evaluate_selection_range(config)
selection = config.params.selection.range
slow_indices = (
selection.x_indices[0],
selection.y_indices[0],
selection.r_indices[0],
selection.theta_indices[0],
selection.k_indices[0],
)
# * Optional plotting of the grid
if config.plot.grid:
log.info("Plot the configuration of the grid.")
log.debug("Configuration 'plot.grid' is set to True.")
plot_discrete_grid(config, slow_indices, preprocessed_trajectories)
else:
log.warning("Configuration 'plot.grid' is set to False.")
# * Optional plotting of the potential
if config.plot.potential_at_selection:
log.info("Plot potential at selection.")
log.debug(
"Configuration 'plot.potential_at_selection' is set to True."
)
plot_potential_at_slow_index(config, slow_indices, piecewise_potential)
else:
log.warning(
"Configuration 'plot.potential_at_selection' is set to False."
)
output_figures = glob.glob("*.pdf")
log.info("Pulling output figures to the parent directory for easy access.")
for figure in output_figures:
shutil.copyfile(figure, Path.cwd().parent / figure)
[docs]
def main():
register_new_resolvers()
model()
if __name__ == "__main__":
main()