-
Notifications
You must be signed in to change notification settings - Fork 5
Add richer observability to the simulation outputs #427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d1e0a71
0775693
2b2cbc8
94abe6d
f5e0288
25b8a39
95deab0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| """Run a 24-hour traffic simulation on a synthetic Manhattan-style grid. | ||
|
|
||
| This script generates grid cartography CSV files, builds a road network, | ||
| configures the dynamics engine, and simulates agent flow with a 1-second | ||
| integration step and 10-second agent insertion cadence. | ||
| """ | ||
|
|
||
| import argparse | ||
|
||
| from datetime import datetime | ||
| import logging | ||
|
|
||
| from dsf.cartography import create_manhattan_cartography | ||
| from dsf.mobility import ( | ||
|
||
| RoadNetwork, | ||
| Dynamics, | ||
| AgentInsertionMethod, | ||
| ) | ||
|
|
||
| from tqdm import trange | ||
Check warningCode scanning / Prospector (reported by Codacy) Unable to import 'tqdm' (import-error) Warning
Unable to import 'tqdm' (import-error)
Check warningCode scanning / Pylintpython3 (reported by Codacy) third party import "from tqdm import trange" should be placed before "from dsf.cartography import create_manhattan_cartography" Warning
third party import "from tqdm import trange" should be placed before "from dsf.cartography import create_manhattan_cartography"
|
||
| from numba import cfunc, float64 | ||
Check warningCode scanning / Prospector (reported by Codacy) Unable to import 'numba' (import-error) Warning
Unable to import 'numba' (import-error)
Check warningCode scanning / Pylintpython3 (reported by Codacy) third party import "from numba import cfunc, float64" should be placed before "from dsf.cartography import create_manhattan_cartography" Warning
third party import "from numba import cfunc, float64" should be placed before "from dsf.cartography import create_manhattan_cartography"
|
||
| import numpy as np | ||
Check warningCode scanning / Prospector (reported by Codacy) Unable to import 'numpy' (import-error) Warning
Unable to import 'numpy' (import-error)
Check warningCode scanning / Pylintpython3 (reported by Codacy) third party import "import numpy as np" should be placed before "from dsf.cartography import create_manhattan_cartography" Warning
third party import "import numpy as np" should be placed before "from dsf.cartography import create_manhattan_cartography"
|
||
|
|
||
|
|
||
| @cfunc(float64(float64, float64), nopython=True, cache=True) | ||
| def custom_speed(max_speed, density): | ||
|
||
| """Compute a density-aware speed multiplier for custom speed modeling.""" | ||
| if density < 0.35: | ||
| return max_speed * (0.9 - 0.1 * density) | ||
| return max_speed * (1.2 - 0.7 * density) | ||
|
|
||
|
|
||
| logging.basicConfig( | ||
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | ||
| ) | ||
|
|
||
| if __name__ == "__main__": | ||
| parser = argparse.ArgumentParser() | ||
Check warningCode scanning / Pylint (reported by Codacy) Constant name "parser" doesn't conform to UPPER_CASE naming style Warning
Constant name "parser" doesn't conform to UPPER_CASE naming style
|
||
| parser.add_argument( | ||
| "--seed", type=int, default=69, help="Random seed for reproducibility" | ||
| ) | ||
| parser.add_argument( | ||
| "--dim", type=str, default="12x12", help="Dimensions of the grid (e.g., 10x10)" | ||
| ) | ||
| parser.add_argument( | ||
| "--amp", type=int, required=True, help="Amplitude of the vehicle input" | ||
| ) | ||
| args = parser.parse_args() | ||
Check warningCode scanning / Pylint (reported by Codacy) Constant name "args" doesn't conform to UPPER_CASE naming style Warning
Constant name "args" doesn't conform to UPPER_CASE naming style
|
||
| np.random.seed(args.seed) | ||
|
|
||
| # Parse the grid dimensions | ||
| try: | ||
| rows, cols = map(int, args.dim.split("x")) | ||
Check warningCode scanning / Pylint (reported by Codacy) Constant name "rows" doesn't conform to UPPER_CASE naming style Warning
Constant name "rows" doesn't conform to UPPER_CASE naming style
Check warningCode scanning / Pylint (reported by Codacy) Constant name "cols" doesn't conform to UPPER_CASE naming style Warning
Constant name "cols" doesn't conform to UPPER_CASE naming style
|
||
| except ValueError: | ||
| raise ValueError( | ||
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Consider explicitly re-raising using the 'from' keyword Note
Consider explicitly re-raising using the 'from' keyword
|
||
| "Invalid grid dimensions. Please use the format 'rowsxcols' (e.g., 10x10)." | ||
| ) | ||
|
|
||
| logging.info(f"Creating manhattan cartography for {rows}x{cols} grid...") | ||
Check warningCode scanning / Prospector (reported by Codacy) Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning
Use lazy % formatting in logging functions (logging-fstring-interpolation)
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Use lazy % formatting in logging functions Note
Use lazy % formatting in logging functions
|
||
| # Get the cartography of the specified city | ||
| df_edges, df_nodes = create_manhattan_cartography(rows, cols) | ||
Check warningCode scanning / Pylint (reported by Codacy) Constant name "df_edges" doesn't conform to UPPER_CASE naming style Warning
Constant name "df_edges" doesn't conform to UPPER_CASE naming style
Check warningCode scanning / Pylint (reported by Codacy) Constant name "df_nodes" doesn't conform to UPPER_CASE naming style Warning
Constant name "df_nodes" doesn't conform to UPPER_CASE naming style
|
||
|
|
||
| df_nodes["type"] = ( | ||
| "traffic_signals" # Set all nodes as traffic lights for simplicity | ||
| ) | ||
|
|
||
| df_edges.to_csv(f"grid_{args.dim}_edges.csv", sep=";", index=False) | ||
| df_nodes.to_csv(f"grid_{args.dim}_nodes.csv", sep=";", index=False) | ||
|
|
||
| del df_edges, df_nodes | ||
|
|
||
| logging.info("Creating road network and dynamics model...") | ||
|
|
||
| # Create a road network from the cartography | ||
| road_network = RoadNetwork() | ||
Check warningCode scanning / Pylint (reported by Codacy) Constant name "road_network" doesn't conform to UPPER_CASE naming style Warning
Constant name "road_network" doesn't conform to UPPER_CASE naming style
|
||
| road_network.importEdges(f"grid_{args.dim}_edges.csv", ";") | ||
| road_network.importNodeProperties(f"grid_{args.dim}_nodes.csv", ";") | ||
| # Adjust network parameters | ||
| road_network.adjustNodeCapacities() | ||
| road_network.autoMapStreetLanes() | ||
| road_network.autoAssignRoadPriorities() | ||
| road_network.autoInitTrafficLights() | ||
| road_network.describe() | ||
|
|
||
| # Generate a random vector of integer values for vehicle input | ||
| # We want values to have a 10s entry for a whole day | ||
| vehicle_input = np.random.normal(args.amp, args.amp * 0.1, size=8640) | ||
Check warningCode scanning / Pylint (reported by Codacy) Constant name "vehicle_input" doesn't conform to UPPER_CASE naming style Warning
Constant name "vehicle_input" doesn't conform to UPPER_CASE naming style
|
||
| vehicle_input = np.clip(vehicle_input, 0, None).astype(int) | ||
Check warningCode scanning / Pylint (reported by Codacy) Constant name "vehicle_input" doesn't conform to UPPER_CASE naming style Warning
Constant name "vehicle_input" doesn't conform to UPPER_CASE naming style
|
||
|
|
||
| # Create a dynamics model for the road network | ||
| dynamics = Dynamics(road_network, seed=args.seed) | ||
Check warningCode scanning / Pylint (reported by Codacy) Constant name "dynamics" doesn't conform to UPPER_CASE naming style Warning
Constant name "dynamics" doesn't conform to UPPER_CASE naming style
|
||
| # To use a custom speed function, you must pass the pointer to the compiled function using the address attribute | ||
Check warningCode scanning / Pylint (reported by Codacy) Line too long (116/100) Warning
Line too long (116/100)
Check warningCode scanning / Pylintpython3 (reported by Codacy) Line too long (116/100) Warning
Line too long (116/100)
|
||
| # dynamics.setSpeedFunction(SpeedFunction.CUSTOM, custom_speed.address) | ||
| # Get epoch time of today at midnight | ||
| epoch_time = int( | ||
Check warningCode scanning / Pylint (reported by Codacy) Constant name "epoch_time" doesn't conform to UPPER_CASE naming style Warning
Constant name "epoch_time" doesn't conform to UPPER_CASE naming style
|
||
| datetime.combine(datetime.today(), datetime.min.time()).timestamp() | ||
| ) | ||
|
|
||
| dynamics.setMeanTravelDistance(10e3) # Set mean travel distance to 10 km | ||
| dynamics.killStagnantAgents(40.0) | ||
| dynamics.setInitTime(epoch_time) | ||
| dynamics.connectDataBase(f"grid_{args.dim}.db") | ||
| dynamics.saveData(300, True, True, True) | ||
|
|
||
| # Simulate traffic for 24 hours with a time step of 1 seconds | ||
| for time_step in trange(86400): | ||
| # Update paths every 5 minutes (300 seconds) | ||
| if time_step % 300 == 0: | ||
| dynamics.updatePaths() | ||
| # Add agents every 10 seconds | ||
| if time_step % 10 == 0: | ||
| dynamics.addAgents( | ||
| vehicle_input[time_step // 10], AgentInsertionMethod.RANDOM | ||
| ) | ||
| dynamics.evolve(False) | ||
|
|
||
| dynamics.summary() | ||
Uh oh!
There was an error while loading. Please reload this page.