Source code for Algorithms.CSA.std_csa

"""
Module contains standard Connection Scan Algorithm (CSA) implementation.
"""

from tqdm import tqdm

from Algorithms.CSA.csa_functions import *


[docs]def std_csa(SOURCE: int, DESTINATION: int, D_TIME, connections_list: list, WALKING_FROM_SOURCE: int, footpath_dict: dict, PRINT_ITINERARY: int) -> tuple: """ Standard CSA implementation Args: SOURCE (int): stop id of source stop. DESTINATION (int): stop id of destination stop. D_TIME (pandas.datetime): departure time. connections_list: WALKING_FROM_SOURCE (int): 1 or 0. 1 indicates walking from SOURCE is allowed. footpath_dict (dict): preprocessed dict. Format {from_stop_id: [(to_stop_id, footpath_time)]}. PRINT_ITINERARY (int): 1 or 0. 1 means print complete path. Returns: output (tuple): tuple containing the best arrival time. Examples: >>> output = std_csa(36, 52, pd.to_datetime('2022-06-30 06:30:00'), connections_list, 1, footpath_dict, 1) >>> print(f"Optimal arrival time is: {output}") """ stop_label, trip_set, pi_label, inf_time = initialize_csa(SOURCE, WALKING_FROM_SOURCE, footpath_dict, D_TIME) for idx, departure_stop, arrival_stop, departure_time, arrival_time, tid in tqdm(connections_list): if departure_time < D_TIME: continue else: if departure_time > stop_label[DESTINATION]: if PRINT_ITINERARY == 1: print("Terminated due to time-based target pruning") break if trip_set[tid] or stop_label[departure_stop] <= departure_time: if stop_label[arrival_stop] > arrival_time: stop_label[arrival_stop] = arrival_time pi_label[arrival_stop] = ('connection', idx) trip_set[tid] = True try: for footpath_stop, duration in footpath_dict[arrival_stop]: if stop_label[footpath_stop] > arrival_time + duration: stop_label[footpath_stop] = arrival_time + duration pi_label[footpath_stop] = ("walking", arrival_stop, footpath_stop, duration) except KeyError: pass output = post_process_csa(SOURCE, DESTINATION, pi_label, PRINT_ITINERARY, connections_list, stop_label, inf_time) return output