Source code for Algorithms.RAPTOR.std_raptor

"""
Module contains RAPTOR implementation.
"""

from Algorithms.RAPTOR.raptor_functions import *

[docs]def raptor(SOURCE: int, DESTINATION: int, D_TIME, MAX_TRANSFER: int, WALKING_FROM_SOURCE: int, CHANGE_TIME_SEC: int, PRINT_ITINERARY: int, routes_by_stop_dict: dict, stops_dict: dict, stoptimes_dict: dict, footpath_dict: dict, idx_by_route_stop_dict: dict) -> list: ''' Standard Raptor implementation Args: SOURCE (int): stop id of source stop. DESTINATION (int): stop id of destination stop. D_TIME (pandas.datetime): departure time. MAX_TRANSFER (int): maximum transfer limit. WALKING_FROM_SOURCE (int): 1 or 0. 1 indicates walking from SOURCE is allowed. CHANGE_TIME_SEC (int): change-time in seconds. PRINT_ITINERARY (int): 1 or 0. 1 means print complete path. routes_by_stop_dict (dict): preprocessed dict. Format {stop_id: [id of routes passing through stop]}. stops_dict (dict): preprocessed dict. Format {route_id: [ids of stops in the route]}. stoptimes_dict (dict): preprocessed dict. Format {route_id: [[trip_1], [trip_2]]}. footpath_dict (dict): preprocessed dict. Format {from_stop_id: [(to_stop_id, footpath_time)]}. idx_by_route_stop_dict (dict): preprocessed dict. Format {(route id, stop id): stop index in route}. Returns: out (list): list of pareto-optimal arrival timestamps. Examples: >>> output = raptor(36, 52, pd.to_datetime('2022-06-30 05:41:00'), 4, 1, 0, 1, routes_by_stop_dict, stops_dict, stoptimes_dict, footpath_dict, idx_by_route_stop_dict) >>> print(f"Optimal arrival time are: {output}") See Also: HypRAPTOR, Tip-based Public Transit Routing (TBTR) ''' out = [] # Initialization marked_stop, marked_stop_dict, label, pi_label, star_label, inf_time = initialize_raptor(routes_by_stop_dict, SOURCE, MAX_TRANSFER) change_time = pd.to_timedelta(CHANGE_TIME_SEC, unit='seconds') (label[0][SOURCE], star_label[SOURCE]) = (D_TIME, D_TIME) Q = {} # Format of Q is {route:stop index} if WALKING_FROM_SOURCE == 1: try: trans_info = footpath_dict[SOURCE] for i in trans_info: (p_dash, to_pdash_time) = i label[0][p_dash] = D_TIME + to_pdash_time star_label[p_dash] = D_TIME + to_pdash_time pi_label[0][p_dash] = ('walking', SOURCE, p_dash, to_pdash_time, D_TIME + to_pdash_time) if marked_stop_dict[p_dash] == 0: marked_stop.append(p_dash) marked_stop_dict[p_dash] = 1 except KeyError: pass # Main Code # Main code part 1 for k in range(1, MAX_TRANSFER + 1): Q.clear() while marked_stop: p = marked_stop.pop() marked_stop_dict[p] = 0 try: routes_serving_p = routes_by_stop_dict[p] for route in routes_serving_p: stp_idx = idx_by_route_stop_dict[(route, p)] try: Q[route] = min(stp_idx, Q[route]) except KeyError: Q[route] = stp_idx except KeyError: continue # Main code part 2 for route, current_stopindex_by_route in Q.items(): current_trip_t = -1 for p_i in stops_dict[route][current_stopindex_by_route:]: if current_trip_t != -1 and current_trip_t[current_stopindex_by_route][1] < min(star_label[p_i], star_label[DESTINATION]): arr_by_t_at_pi = current_trip_t[current_stopindex_by_route][1] label[k][p_i], star_label[p_i] = arr_by_t_at_pi, arr_by_t_at_pi pi_label[k][p_i] = (boarding_time, boarding_point, p_i, arr_by_t_at_pi, tid) if marked_stop_dict[p_i] == 0: marked_stop.append(p_i) marked_stop_dict[p_i] = 1 if current_trip_t == -1 or label[k - 1][p_i] + change_time < current_trip_t[current_stopindex_by_route][ 1]: # assuming arrival_time = departure_time tid, current_trip_t = get_latest_trip_new(stoptimes_dict, route, label[k - 1][p_i], current_stopindex_by_route, change_time) if current_trip_t == -1: boarding_time, boarding_point = -1, -1 else: boarding_point = p_i boarding_time = current_trip_t[current_stopindex_by_route][1] current_stopindex_by_route = current_stopindex_by_route + 1 # Main code part 3 marked_stop_copy = [*marked_stop] for p in marked_stop_copy: try: trans_info = footpath_dict[p] for i in trans_info: (p_dash, to_pdash_time) = i new_p_dash_time = label[k][p] + to_pdash_time if label[k][p_dash] > new_p_dash_time and new_p_dash_time < min(star_label[p_dash], star_label[DESTINATION]): label[k][p_dash], star_label[p_dash] = new_p_dash_time, new_p_dash_time pi_label[k][p_dash] = ('walking', p, p_dash, to_pdash_time, new_p_dash_time) if marked_stop_dict[p_dash] == 0: marked_stop.append(p_dash) marked_stop_dict[p_dash] = 1 except KeyError: continue # Main code End if marked_stop == deque([]): if PRINT_ITINERARY == 1: # print('code ended with termination condition') pass break _, _, rap_out = post_processing(DESTINATION, pi_label, PRINT_ITINERARY, label) out.append(rap_out) return out