Need help with this Python Project
# coding=utf-8
from networkx import Graph
class ExplorableGraph(object):
“””
Keeps track of “explored nodes” i.e. nodes that have been queried from the
graph.
Delegates graph operations to a networkx.Graph
“””
def __init__(self, graph):
“””
:type graph: Graph
“””
self.__graph = graph
self._explored_nodes = dict([(node, 0) for node in self.__graph.nodes()])
def explored_nodes(self):
return self._explored_nodes
def __getattr__(self, item):
return getattr(self.__graph, item)
def reset_search(self):
self._explored_nodes = dict([(node, 0) for node in self.__graph.nodes()])
def __iter__(self):
return self.__graph.__iter__()
def __getitem__(self, n):
#self._explored_nodes |= {n}
if n in self.__graph.nodes():
self._explored_nodes[n] += 1
return self.__graph.__getitem__(n)
def nodes_iter(self, data=False):
self._explored_nodes = set(self.__graph.nodes_iter())
return self.__graph.nodes_iter(data)
def neighbors(self, n):
if n in self.__graph.nodes():
self._explored_nodes[n] += 1
return self.__graph.neighbors(n)
def get_edge_weight(self, u, v):
return self.__graph.get_edge_data(u, v)[‘weight’]
“””
Read graphs in Open Street Maps osm format
Based on osm.py from brianw’s osmgeocode
http://github.com/brianw/osmgeocode, which is based on osm.py from
comes from Graphserver:
http://github.com/bmander/graphserver/tree/master and is copyright (c)
2007, Brandon Martin-Anderson under the BSD License
“””
import sys
sys.path.append(‘./lib’)
sys.path.append(‘./workspace/lib’)
import xml.sax
import copy
import networkx
def download_osm(left,bottom,right,top):
“”” Return a filehandle to the downloaded data.”””
from urllib import urlopen
fp = urlopen( “http://api.openstreetmap.org/api/0.5/map?bbox=%f,%f,%f,%f”%(left,bottom,right,top) )
return fp
def read_osm(filename_or_stream, only_roads=True):
“””Read graph in OSM format from file specified by name or by stream object.
Parameters
———-
filename_or_stream : filename or stream object
Returns
——-
G : Graph
Examples
——–
>>> G=nx.read_osm(nx.download_osm(-122.33,47.60,-122.31,47.61))
>>> plot([G.node[n][‘data’].lat for n in G], [G.node[n][‘data’].lon for n in G], ‘,’)
“””
osm = OSM(filename_or_stream)
G = networkx.Graph()
for w in osm.ways.itervalues():
if only_roads and ‘highway’ not in w.tags:
continue
G.add_path(w.nds, id=w.id, data=w)
for n_id in G.nodes_iter():
n = osm.nodes[n_id]
G.node[n_id] = dict(data=n)
return G
class Node:
def __init__(self, id, lon, lat):
self.id = id
self.lon = lon
self.lat = lat
self.pos = (lat, lon)
self.tags = {}
def __str__(self):
return str(self.id)
def __getitem__(self, key):
if key == ‘id’:
return self.id
if key == ‘pos’:
return (self.lon, self.lat)
if key == ‘tags’:
return self.tags
def __eq__(self, other):
if isinstance(other, Node):
return self.id == other.id
return False
def __hash__(self):
return hash(self.id)
def __str__(self):
return str(self.id)
class Way:
def __init__(self, id, osm):
self.osm = osm
self.id = id
self.nds = []
self.tags = {}
def split(self, dividers):
# slice the node-array using this nifty recursive function
def slice_array(ar, dividers):
for i in range(1,len(ar)-1):
if dividers[ar[i]]>1:
#print “slice at %s”%ar[i]
left = ar[:i+1]
right = ar[i:]
rightsliced = slice_array(right, dividers)
return [left]+rightsliced
return [ar]
slices = slice_array(self.nds, dividers)
# create a way object for each node-array slice
ret = []
i=0
for slice in slices:
littleway = copy.copy( self )
littleway.id += “-%d”%i
littleway.nds = slice
ret.append( littleway )
i += 1
return ret
class OSM:
def __init__(self, filename_or_stream):
“”” File can be either a filename or stream/file object.”””
nodes = {}
ways = {}
superself = self
class OSMHandler(xml.sax.ContentHandler):
@classmethod
def setDocumentLocator(self,loc):
pass
@classmethod
def startDocument(self):
pass
@classmethod
def endDocument(self):
pass
@classmethod
def startElement(self, name, attrs):
if name==’node’:
self.currElem = Node(attrs[‘id’], float(attrs[‘lon’]), float(attrs[‘lat’]))
elif name==’way’:
self.currElem = Way(attrs[‘id’], superself)
elif name==’tag’:
self.currElem.tags[attrs[‘k’]] = attrs[‘v’]
elif name==’nd’:
self.currElem.nds.append( attrs[‘ref’] )
@classmethod
def endElement(self,name):
if name==’node’:
nodes[self.currElem.id] = self.currElem
elif name==’way’:
ways[self.currElem.id] = self.currElem
@classmethod
def characters(self, chars):
pass
xml.sax.parse(filename_or_stream, OSMHandler)
self.nodes = nodes
self.ways = ways
#count times each node is used
node_histogram = dict.fromkeys( self.nodes.keys(), 0 )
for way in self.ways.values():
if len(way.nds) < 2: #if a way has only one node, delete it out of the osm collection
del self.ways[way.id]
else:
for node in way.nds:
node_histogram[node] += 1
#use that histogram to split all ways, replacing the member set of ways
new_ways = {}
for id, way in self.ways.iteritems():
split_ways = way.split(node_histogram)
for split_way in split_ways:
new_ways[split_way.id] = split_way
self.ways = new_ways
if __name__ == '__main__':
"""Converting Atlanta OSM to pickle."""
import math
import random
import pickle
graph = read_osm('atlanta.osm')
#graph = read_osm('romania_graph.pickle')
new_nodes = {node:graph.nodes[node]['data'] for node in graph.nodes()}
euclidean = lambda p1, p2: math.sqrt((p1[0]-p2[0])**2 + (p1[1] - p2[1])**2) * (1 + random.random())
rads = lambda p1, p2: map(math.radians, [p1[0], p1[1], p2[0], p2[1]])
haversine = lambda lat1, long1, lat2, long2: 6371.0 * 2*math.asin(math.sqrt(math.sin(lat2-lat1)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(long2-long1)**2))
new_edges = [(new_nodes[node1], new_nodes[node2], haversine(*rads(new_nodes[node1]['pos'], new_nodes[node2]['pos']))) for node1, node2 in graph.edges()]
new_graph = networkx.Graph()
[new_graph.add_node(node, data.__dict__) for node, data in new_nodes.items()]
[new_graph.add_edge(s.id,t.id,weight=w) for s,t,w in new_edges]
for key in new_graph.nodes.keys():
#new_graph.nodes[node]['pos'] = (new_graph.nodes[node]['lat'], new_graph.nodes[node]['lon'])
#new_graph.nodes[node]['position'] = (new_graph.nodes[node]['lat'], new_graph.nodes[node]['lon'])
new_graph.nodes[key]['pos'] = (new_graph.nodes[key]['lat'], new_graph.nodes[key]['lon'])
new_graph.nodes[key]['position'] = (new_graph.nodes[key]['lat'], new_graph.nodes[key]['lon'])
pickle.dump( new_graph , open( 'atlanta_osm.pickle' ,'wb') )
print('Done')
# coding=utf-8
import pickle
import random
import unittest
import matplotlib.pyplot as plt
import networkx
from explorable_graph import ExplorableGraph
from submission import PriorityQueue, a_star, bidirectional_a_star
from visualize_graph import plot_search
class TestPriorityQueue(unittest.TestCase):
“””Test Priority Queue implementation”””
def test_append_and_pop(self):
“””Test the append and pop functions”””
queue = PriorityQueue()
temp_list = []
for _ in range(10):
a = random.randint(0, 10000)
queue.append((a, ‘a’))
temp_list.append(a)
temp_list = sorted(temp_list)
for item in temp_list:
popped = queue.pop()
self.assertEqual(popped[0], item)
def test_fifo_property(self):
“Test the fifo property for nodes with same priority”
queue = PriorityQueue()
temp_list = [(1,’b’), (1, ‘c’), (1, ‘a’)]
for node in temp_list:
queue.append(node)
for expected_node in temp_list:
actual_node = queue.pop()
#print(“DEBUG FIFO”, actual_node[-1], ” “, expected_node[-1])
self.assertEqual(actual_node[-1], expected_node[-1])
class TestBasicSearch(unittest.TestCase):
“””Test the simple search algorithms: BFS, UCS, A*”””
def setUp(self):
“””Romania map data from Russell and Norvig, Chapter 3.”””
with open(‘romania_graph.pickle’, ‘rb’) as rom:
romania = pickle.load(rom)
self.romania = ExplorableGraph(romania)
self.romania.reset_search()
def test_a_star(self):
“””Test and visualize A* search”””
start = ‘a’
goal = ‘u’
node_positions = {n: self.romania.nodes[n][‘pos’] for n in
self.romania.nodes.keys()}
self.romania.reset_search()
path = a_star(self.romania, start, goal)
self.draw_graph(self.romania, node_positions=node_positions,
start=start, goal=goal, path=path,
title=’test_astar blue=start, yellow=goal, green=explored’)
def test_a_star_num_explored(self):
“””Test A* for correct path and number of explored nodes”””
start = ‘a’
goal = ‘u’
node_positions = {n: self.romania.nodes[n][‘pos’] for n in
self.romania.nodes.keys()}
self.romania.reset_search()
path = a_star(self.romania, start, goal)
self.assertEqual(path, [‘a’, ‘s’, ‘r’, ‘p’, ‘b’, ‘u’]) # Check for correct path
explored_nodes = sum(list(self.romania.explored_nodes().values()))
self.assertEqual(explored_nodes, 8) # Compare explored nodes to reference implementation
@staticmethod
def draw_graph(graph, node_positions=None, start=None, goal=None,
path=None, title=”):
“””Visualize results of graph search”””
explored = [key for key in graph.explored_nodes() if graph.explored_nodes()[key] > 0]
labels = {}
for node in graph:
labels[node] = node
if node_positions is None:
node_positions = networkx.spring_layout(graph)
networkx.draw_networkx_nodes(graph, node_positions)
networkx.draw_networkx_edges(graph, node_positions, style=’dashed’)
networkx.draw_networkx_labels(graph, node_positions, labels)
networkx.draw_networkx_nodes(graph, node_positions, nodelist=explored,
node_color=’g’)
edge_labels = networkx.get_edge_attributes(graph, ‘weight’)
networkx.draw_networkx_edge_labels(graph, node_positions, edge_labels=edge_labels)
if path is not None:
edges = [(path[i], path[i + 1]) for i in range(0, len(path) – 1)]
networkx.draw_networkx_edges(graph, node_positions, edgelist=edges,
edge_color=’b’)
if start:
networkx.draw_networkx_nodes(graph, node_positions,
nodelist=[start], node_color=’b’)
if goal:
networkx.draw_networkx_nodes(graph, node_positions,
nodelist=[goal], node_color=’y’)
plt.title(title)
plt.plot()
plt.show()
class TestBidirectionalSearch(unittest.TestCase):
“””Test the bidirectional search algorithms: UCS, A*”””
def setUp(self):
with open(‘romania_graph.pickle’, ‘rb’) as rom:
romania = pickle.load(rom)
self.romania = ExplorableGraph(romania)
self.romania.reset_search()
def test_bidirectional_a_star(self):
“””Test and generate GeoJSON for bidirectional A* search”””
start = ‘a’
goal = ‘u’
node_positions = {n: self.romania.nodes[n][‘pos’] for n in
self.romania.nodes.keys()}
path = bidirectional_a_star(self.romania, start, goal)
self.draw_graph(self.romania, node_positions=node_positions,
start=start, goal=goal, path=path,
title=’test_bidirectional_astar blue=start, yellow=goal, green=explored’)
@staticmethod
def draw_graph(graph, node_positions=None, start=None, goal=None,
path=None, title=”):
“””Visualize results of graph search”””
explored = [key for key in graph.explored_nodes() if graph.explored_nodes()[key] > 0]
labels = {}
for node in graph:
labels[node] = node
if node_positions is None:
node_positions = networkx.spring_layout(graph)
networkx.draw_networkx_nodes(graph, node_positions)
networkx.draw_networkx_edges(graph, node_positions, style=’dashed’)
networkx.draw_networkx_labels(graph, node_positions, labels)
networkx.draw_networkx_nodes(graph, node_positions, nodelist=explored,
node_color=’g’)
edge_labels = networkx.get_edge_attributes(graph, ‘weight’)
networkx.draw_networkx_edge_labels(graph, node_positions, edge_labels=edge_labels)
if path is not None:
edges = [(path[i], path[i + 1]) for i in range(0, len(path) – 1)]
networkx.draw_networkx_edges(graph, node_positions, edgelist=edges,
edge_color=’b’)
if start:
networkx.draw_networkx_nodes(graph, node_positions,
nodelist=[start], node_color=’b’)
if goal:
networkx.draw_networkx_nodes(graph, node_positions,
nodelist=[goal], node_color=’y’)
plt.title(title)
plt.plot()
plt.show()
if __name__ == ‘__main__’:
unittest.main()
# coding=utf-8
“””
This file is your main submission that will be graded against. Only copy-paste
code on the relevant classes included here. Do not add any classes or functions
to this file that are not part of the classes that we want.
“””
import heapq
import os
import pickle
import math
class PriorityQueue(object):
“””
A queue structure where each element is served in order of priority.
Elements in the queue are popped based on the priority with higher priority
elements being served before lower priority elements. If two elements have
the same priority, they will be served in the order they were added to the
queue.
Traditionally priority queues are implemented with heaps, but there are any
number of implementation options.
(Hint: take a look at the module heapq)
Attributes:
queue (list): Nodes added to the priority queue.
“””
def __init__(self):
“””Initialize a new Priority Queue.”””
self.queue = []
self.count = 0
def pop(self):
“””
Pop top priority node from queue.
Returns:
The node with the highest priority.
“””
# TODO: finish this function!
node_mod = heapq.heappop(self.queue)
return (node_mod[0], node_mod[2])
raise NotImplementedError
def remove(self, node):
“””
Remove a node from the queue.
Hint: You might require this in ucs. However, you may
choose not to use it or to define your own method.
Args:
node (tuple): The node to remove from the queue.
“””
raise NotImplementedError
def __iter__(self):
“””Queue iterator.”””
queue_mod = self.queue
queue_rev = [(queue_mod[i][0], queue_mod[i][2]) for i in range(len(queue_mod))]
#return iter(sorted(self.queue))
return iter(sorted(queue_rev))
def __str__(self):
“””Priority Queue to string.”””
queue_mod = self.queue
queue_rev = [(queue_mod[i][0], queue_mod[i][2]) for i in range(len(queue_mod))]
#return ‘PQ:%s’ % self.queue
return ‘PQ:%s’ % queue_rev
def append(self, node):
“””
Append a node to the queue.
Args:
node: Comparable Object to be added to the priority queue.
“””
# TODO: finish this function!
#heapq.heappush(self.queue, node)
#print(“DEBUG PQ: queue = “, self.queue)
node_mod = (node[0], self.count, node[1])
heapq.heappush(self.queue, node_mod)
self.count += 1
#raise NotImplementedError
def __contains__(self, key):
“””
Containment Check operator for ‘in’
Args:
key: The key to check for in the queue.
Returns:
True if key is found in queue, False otherwise.
“””
return key in [n[-1] for n in self.queue]
def __eq__(self, other):
“””
Compare this Priority Queue with another Priority Queue.
Args:
other (PriorityQueue): Priority Queue to compare against.
Returns:
True if the two priority queues are equivalent.
“””
return self.queue == other.queue
def size(self):
“””
Get the current size of the queue.
Returns:
Integer of number of items in queue.
“””
return len(self.queue)
def clear(self):
“””Reset queue to empty (no nodes).”””
self.queue = []
def top(self):
“””
Get the top item in the queue.
Returns:
The first item stored in the queue.
“””
queue0 = self.queue[0]
queue0_rev = (queue0[0], queue0[2])
#return self.queue[0]
return queue0_rev
def null_heuristic(graph, v, goal):
“””
Null heuristic used as a base line.
Args:
graph (ExplorableGraph): Undirected graph to search.
v (str): Key for the node to calculate from.
goal (str): Key for the end node to calculate to.
Returns:
0
“””
return 0
def euclidean_dist_heuristic(graph, v, goal):
“””
Warm-up exercise: Implement the euclidean distance heuristic.
See README.md for exercise description.
Args:
graph (ExplorableGraph): Undirected graph to search.
v (str): Key for the node to calculate from.
goal (str): Key for the end node to calculate to.
Returns:
Euclidean distance between `v` node and `goal` node
“””
# TODO: finish this function!
fst = graph.nodes[v]
lst = graph.nodes[goal]
#print(“DEBUG: fst = “, fst, “\n”)
#print(“DEBUG: goal = “, goal)
fst_x = fst[‘pos’][0]
fst_y = fst[‘pos’][1]
lst_x = lst[‘pos’][0]
lst_y = lst[‘pos’][1]
dist = math.sqrt((fst_x-lst_x)**2 + (fst_y-lst_y)**2)
return dist
raise NotImplementedError
def a_star(graph, start, goal, heuristic=euclidean_dist_heuristic):
“””
Warm-up exercise: Implement A* algorithm.
See README.md for exercise description.
Args:
graph (ExplorableGraph): Undirected graph to search.
start (str): Key for the start node.
goal (str): Key for the end node.
heuristic: Function to determine distance heuristic.
Default: euclidean_dist_heuristic.
Returns:
The best path as a list from the start and goal nodes (including both).
“””
# TODO: finish this function!
if start == goal:
print(“Total cost = “, 0)
print(“Path: “, [goal])
return [goal]
paths = dict()
frontier = PriorityQueue()
frontier.append((heuristic(graph, start, goal), start))
paths[start] = [0, [start]]
explored = set()
while frontier.size() > 0:
node = frontier.pop()[-1]
if node == goal:
print(“Total cost = “, paths[goal][0])
print(“Path: “, paths[goal][1])
return paths[goal][1]
explored.add(node)
for n in graph.neighbors(node):
if not(n in explored):
g = paths[node][0] + graph.get_edge_data(node, n)[‘weight’]
frontier.append((g+heuristic(graph, n, goal), n))
entries = paths.keys()
#print(“DEBUG entry: “, entry)
if n in entries and g >= paths[n][0]: continue
paths[n] = [g, paths[node][1]+[n]]
raise NotImplementedError
def bidirectional_a_star(graph, start, goal,
heuristic=euclidean_dist_heuristic):
“””
Exercise 2: Bidirectional A*.
See README.md for exercise description.
Args:
graph (ExplorableGraph): Undirected graph to search.
start (str): Key for the start node.
goal (str): Key for the end node.
heuristic: Function to determine distance heuristic.
Default: euclidean_dist_heuristic.
Returns:
The best path as a list from the start and goal nodes (including both).
“””
# TODO: finish this function!
raise NotImplementedError
def path_len(graph, path):
l = 0
if len(path) <= 1: return 0
pred = path[0]
for n in path[1:]:
l += graph.get_edge_weight(pred, n)
pred = n
return l
def load_data(graph, time_left):
"""
Feel free to implement this method. We'll call it only once
at the beginning of the Race, and we'll pass the output to your custom_search function.
graph: a networkx graph
time_left: function you can call to keep track of your remaining time.
usage: time_left() returns the time left in milliseconds.
the max time will be 10 minutes.
* To get a list of nodes, use graph.nodes()
* To get node neighbors, use graph.neighbors(node)
* To get edge weight, use graph.get_edge_weight(node1, node2)
"""
# nodes = graph.nodes()
return None
“””Convert OSM map to GeoJSON format.”””
from networkx import degree
from networkx import all_neighbors
from osm2networkx import read_osm
from random import shuffle
from random import randint
from sys import argv
from os import remove
from os.path import isfile
def get_random_nodes(graph, K=100):
“””Sample up to K random nodes from graph.”””
node_sample = graph.nodes()
shuffle(node_sample)
K = min(K, len(node_sample))
node_sample = node_sample[0:K]
return node_sample
def plot_random_nodes(graph, outfile_name, K=100):
“””Write K random nodes to GeoJSON file.”””
node_sample = get_random_nodes(graph, K)
with open(outfile_name, ‘w’) as outfile:
outfile.write(‘{ “type” : “FeatureCollection”, \n’)
outfile.write(‘”features” : [\n’)
plot_nodes(node_sample, graph, outfile, False, False)
outfile.write(‘]\n}’)
plot_nodes(node_sample, graph, outfile)
def plot_nodes(node_list, graph, outfile, header=False, footer=False, color=”#F5A207″):
“””Write list of nodes from graph to
GeoJSON file.”””
if header:
outfile.write(‘{ “type” : “FeatureCollection”, \n’)
outfile.write(‘”features” : [\n’)
node_strings = list(map(lambda x: node_to_GeoJSON(x, graph, color), node_list))
outfile.write(‘,’.join(node_strings))
if footer:
outfile.write(‘]\n}’)
print(‘done writing nodes to file’)
def node_to_GeoJSON(node, graph, color=”#F5A207″):
“””Convert node to GeoJSON string.”””
data = graph.nodes[node]
lat = data[‘lat’]
lon = data[‘lon’]
node_string = ”
node_string += ‘{ “type” : “Feature”,\n’
node_string += ‘”geometry” : {“type”: “Point”, “coordinates”: [%f,%f]},\n’%(lon, lat)
node_string += ‘”properties”: {“marker-color”: “%s”}\n’%(color)
node_string += ‘}\n’
return node_string
def plot_edges(edge_list, graph, outfile, header=False, footer=False, color=”#F5A207″):
“””Write list of edges from graph to
GeoJSON file.”””
if header:
outfile.write(‘{ “type” : “FeatureCollection”, \n’)
outfile.write(‘”features” : [\n’)
edge_strings = list(map(lambda x: edge_to_GeoJSON(x, graph, color), edge_list))
outfile.write(‘,’.join(edge_strings))
if footer:
outfile.write(‘]\n}’)
print(‘done writing edges to file’)
def edge_to_GeoJSON(edge, graph, color=”#F5A207″):
“””Convert edge to GeoJSON string.”””
start = edge[0]
end = edge[1]
start_lon = graph.nodes[start][‘lon’]
start_lat = graph.nodes[start][‘lat’]
end_lon = graph.nodes[end][‘lon’]
end_lat = graph.nodes[end][‘lat’]
edge_string = ”
edge_string += ‘{ “type” : “Feature”,\n’
edge_string += ‘”geometry” : {“type”: “LineString”, ‘
edge_string += ‘”coordinates”: [[%f,%f], [%f,%f]]},\n’%(
start_lon, start_lat, end_lon, end_lat)
edge_string += ‘”properties”: {“marker-color”: “%s”}\n’%(color)
edge_string += ‘}\n’
return edge_string
def get_random_path(graph, K=100):
“””Pick one of top 10 most-connected nodes as
start, get a random neighbor, repeat up to K
times to build a random path.
Returns path nodes (as ids), path edges
(as id tuples), all explored nodes and
all explored edges.”””
nodes = graph.nodes()
nodes = sorted(nodes, key=graph.degree, reverse=True)
start_node = nodes[randint(0,min(10,len(nodes)))]
print(‘start node has degree %d’%(graph.degree(start_node)))
path_nodes = []
path_edges = []
explored_nodes = set([start_node])
last_node = start_node
explored_nodes.add(start_node)
K = min(K, len(nodes))
for i in range(1,K):
neighbors = [n for n in all_neighbors(graph, last_node)]
new_nodes = set(neighbors).difference(set(path_nodes))
if not new_nodes:
print(‘finish with node %s with neighbors %s and explored_nodes %s’%
(last_node, str(neighbors), str(path_nodes)))
break
explored_nodes = explored_nodes.union(new_nodes)
new_nodes = sorted(list(new_nodes), key=graph.degree, reverse=True)
neighbor = new_nodes[randint(0,min(2,len(new_nodes)-1))]
path_edges.append((last_node, neighbor))
last_node = neighbor
path_nodes.append(last_node)
return path_nodes, explored_nodes
def plot_search(graph, outfile_name, path_nodes, explored_nodes):
“””Plot path nodes/edges as well as all other
explored nodes in different color.”””
with open(outfile_name, ‘w’) as outfile:
outfile.write(‘{ “type” : “FeatureCollection”, \n’)
outfile.write(‘”features” : [\n’)
# path nodes
plot_nodes(path_nodes, graph, outfile, False, False)
# path edges
outfile.write(‘,\n’) # separate each category
path_edges = []
for i in range(1,len(path_nodes)):
path_edges.append((path_nodes[i-1], path_nodes[i]))
plot_edges(path_edges, graph, outfile, False, False)
# explored nodes (non-path ones)
# plot them as black
if explored_nodes:
explored_nodes = set([key for key in explored_nodes.keys() if explored_nodes[key] > 0])
explored_nodes = explored_nodes.difference(set(path_nodes))
if len(explored_nodes) > 0:
outfile.write(‘,\n’)
plot_nodes(explored_nodes, graph, outfile, False, False, color=”#000000″)
outfile.write(‘]\n}’)
def plot_random_path(graph, outfile_name, K=100):
“””Plot random path and write to file.”””
path_nodes, explored_nodes = get_random_path(graph, K)
plot_search(graph, outfile_name, path_nodes, explored_nodes)
print(‘done plotting random path’)
def test_plot_random_path(graph, outfile_name):
“””Testing the random path plotting.”””
graph = read_osm(graph)
if isfile(outfile_name):
remove(outfile_name)
plot_random_path(graph, outfile_name, K=100)
if __name__ == “__main__”:
“””Read graph from file and write
nodes/edges to file.”””
if(len(argv) > 1):
test_plot_random_path(argv[1], argv[2])
CS 4332 Introduction to Artificial Intelligence
Project 2 –
A* Search
and Bi-Directional A* Search
Overview
Search is an integral part of AI. It helps in problem solving across a wide variety of domains where a solution isn’t immediately clear. You will implement a graph search algorithm with the goal of solving bi-directional search.
Submission
Submit the submission.py file in Blackboard. The deliverable for the assignment is this ‘submission.py’ file with the necessary functions/methods completed. No other Python source files need to be submitted.
Also include snapshots of the output of your program and the generated graph showing the search results in your submission.
The Files
While you’ll only have to edit and submit
submission.py, there are a number of notable files:
1.
submission.py: Where you will implement your bi-directional
A* Search.
2.
search_submission_tests.py
: Sample tests to validate your searches locally.
3.
romania_graph.pickle
: Serialized graph files for Romania.
4.
explorable_graph.py
: A wrapper around networkx that tracks explored nodes.
FOR DEBUGGING ONLY
5.
visualize_graph.py
: Module to visualize search results. See below on how to use it.
6.
osm2networkx.py
: Module used by visualize graph to read OSM networks.
The Assignment
Your task is to implement an informed search algorithm that will calculate a driving route between two points in Romania with a minimal time and space cost. There is a search_submission_tests.py file to help you along the way. We will be using an undirected network representing a map of Romania.
Notes
There are some things that are among our most common questions:
· Remember that if start and goal are the same, you should return []. This keeps your results consistent with ours and avoids some headache.
· Most ‘NoneType object …’ errors are because the path you return is not completely connected (a pair of successive nodes in the path are not connected). Or because the path variable itself is empty.
· Individual tests can be run using the following:
import search_submission_tests as tests
tests.TestPriorityQueue().test_append_and_pop()
· For running the search tests, use this:
import search_submission_tests as tests
testclass = tests.TestBasicSearch()
testclass.setUp()
testclass.test_bfs()
A* search
The implemented A* search uses Euclidean distance as the default heuristic, which is implemented in function euclidean_dist_heuristic(). It is passed to a_star() as the heuristic parameter. We provide null_heuristic() as a baseline heuristic to test against when calling a_star tests.
Notes:
1. You need to include start and goal in the path.
2. If your start and goal are the same then just return [].
3. The above are just to keep your results consistent with our test cases.
4. You can access all the neighbors of a given node by calling graph[node], or graph.neighbors(node) ONLY.
5. You can access the weight of an edge using: graph.get_edge_weight(node_1, node_2). Not using this method will result in your explored nodes count being higher than it should be.
6. You can access the (x, y) position of a node using: graph.nodes[n][‘pos’]. You will need this for calculating the heuristic distance.
Bidirectional A* search
Implement bidirectional A* search. Remember that you need to calculate a heuristic for both the start-to-goal search and the goal-to-start search.
bidirectional_a_star() should return the path from the start node to the goal node, as a list of nodes.
Resources
·
A* Search
Links from Udacity, below the videos:
·
Reach for A∗: An Efficient Point-to-Point Shortest Path Algorithm
·
Computing the Shortest Path: A∗ Search Meets Graph Theory