from PathGraph import * from Queue import PriorityQueue from math import sqrt from collections import deque from random import shuffle from time import sleep from time import time # This returns an instantiated version of the pathfinding classes specified. # If a pathfinding algorithm is added here, it will automatically show up in the gui. def allPathfinding(graph): return [ AStar(graph), RRT(graph), # NodePlot(graph), Dijkstra(graph), # Astar2(graph), # RRT2(graph) Laplace(graph) ] # Returns the euclidean distance between two node id's in the given graph def euclidDist(a,b, graph): sum = 0 for i in range(graph.coords): sum += (graph.getNode(a).coords[i] - graph.getNode(b).coords[i]) ** 2 return float(sqrt(sum)) # The base pathfinding class. Inherit and overwrite FindPath to implement a pathfinding algorithm. class Pathfind: label = "" # The label of the Pathfinding class. Used as the label on the gui # Creates a new instance of the pathfinding class, over the specified PathGraph. # Does not have to be linked to a gui. def __init__(self, graph, gui=None): self.graph = graph self.gui = gui self.path = None self.pathLen = -1 self.drawnObjects = [] # Find a path from the starting node id to the goal node id. # Such a path found is a list of node id's, which form a valid path from startNodeID # to goalNodeID, such that each node represented in the array is adjacent to its # immediate neighbors in the list. List is NOT returned, instead set it to self.path # Default behavior is to fail to find a path. Overwrite this in path algorithms implemented. def FindPath(self, startNodeID, goalNodeID): self.path = None return # Returns a list of tuples of imformation about the path. # The list is of 2-tuples. Entry 1 is the label, entry 2 is the information. # Default behavior is to return the length. Overwrite this is desired in path algorithms implemented. def GetStats(self): if self.path == None: return [("No Path Found", "")] pathStats = [] length = 0 for nodeID, adjNodeID in zip(self.path[:-1], self.path[1:]): adjCost = self.graph.getAdjacencyCost(nodeID, adjNodeID) if adjCost: length += adjCost pathStats.append(("Path Length:", round(length,3))) return pathStats ############################################# # Nothing needs to be overwritten past here # ############################################# # Set which gui this is attached to. def SetGui(self, gui): self.gui = gui # Smooth the path. Currently does not work as intended. def Smooth(self): if self.path: #pathobjects = [] thisnodem2=[] # node-2 thisnodem1=[] # node-1 xm1 = -1 ym1 = -1 xp1 = -1 yp1 = -1 xavg = -1 yavg = -1 for index in range(1,len(self.path)-1,2): # starting at 1, ending at next to last, in steps of 2 nodeID = self.path[index] thisnode = self.graph.getNode(nodeID) (x,y) = thisnode.getCoordinates() prevnode = self.graph.getNode(nodeID-1) (xm1,ym1) = prevnode.getCoordinates() nextnode = self.graph.getNode(nodeID+1) (xp1,yp1) = nextnode.getCoordinates() xavg = 0.5* (xm1 + xp1) yavg = 0.5* (ym1 + yp1) x2 = int(0.5+(x - 2.0*(x-xavg))) y2 = int(0.5+(y - 2.0*(y-yavg))) targetnode = self.graph.nodeDict.get((x2,y2),None) if (targetnode != None): print x, ' -> ', x2, ', ', y, ' -> ',y2 self.path[index] = targetnode else: print "No change: ",x,", ",y #thisnode.setCoordinates((x2,y2)) #print "(x,y)= (", x,",",y,") (x2,y2)= (", x2,",",y2,")", #print "(xm1,ym1)= (", xm1,",",ym1,") (xp1,yp1)= (", xp1,",",yp1,")" #print self.path else: return # If there is an attached gui, draw the list of nodeID's in drawList. color and fill are the colors desired, # height and width control the size of the rectangles drawn. If clear is False, does not remove any previous # results from calling this. # This is used by the "animation" feature from the gui, to animate your planning algorithm. def DrawList(self, drawList, height=0, width=0, color='green', fill=None, clear=True): if self.gui and self.gui.planDraw.get(): if clear: self.ClearDraws() for node in drawList: (x,y) = self.graph.getNode(node).getCoordinates() self.drawnObjects.append(self.gui.map.create_rectangle(x+width,y+height,x-width,y-height,outline=color, fill=fill)) self.gui.tkroot.update_idletasks() # Clear all the graphics from calling DrawList def ClearDraws(self): if self.drawnObjects: for obj in self.drawnObjects: self.gui.map.delete(obj) self.drawnObjects = [] # Return the path found (or the lack of such) def ReturnPath(self): return self.path class Laplace(Pathfind): label = "Laplace ala Dave" #redefinition of DrawList to plot the color by field strength def DrawList(self, drawList, height=0, width=0, color= 'green', fill=None, clear=False): if self.gui and self.gui.planDraw.get(): # if clear: # self.ClearDraws() for node in drawList: (x,y) = self.graph.getNode(node).getCoordinates() if self.graph.getNode(node).charge > 0.9: color = 'black' elif self.graph.getNode(node).charge > 0.8: color = 'blue' elif self.graph.getNode(node).charge > 0.7: color = 'cyan' elif self.graph.getNode(node).charge > 0.6: color = 'brown' elif self.graph.getNode(node).charge > 0.5: color = 'green' else: color = 'red' self.drawnObjects.append(self.gui.map.create_rectangle(x+width,y+height,x-width,y-height,outline=color, fill=fill)) self.gui.tkroot.update_idletasks() def FindPath(self, start, end): #initializations tolerance = 0.003 maxErr = 1.0 nNeighbors = 8 iteration = 0 #just a debug iteration checker maxCharge = 1.0 for node in self.graph.nodeList: if node.nid == end: node.charge = 0.0 else: node.charge = 0.5 #values in node.val and node.error are initialized in pathgraph.py #intialize end charge to 0 #self.graph.getNode(start).charge = 1.0 self.graph.getNode(end).charge = 0.0 #how to handle nodes on the boundary, which get a charge of 1? #solving Laplace's equation for the problem #while maxErr > tolerance: while iteration < 20: checkNode = self.graph.getNode(end) for neb in checkNode.adjs: print self.graph.getNode(neb[0]).charge print "check node charge at iteration" +str(iteration) + ": "+ str(checkNode.charge) for node in self.graph.nodeList: if node.nid != end: #make sure we don't edit end charge tmp = node.charge sumVal = 0 for neb in node.adjs: sumVal = sumVal + self.graph.getNode(neb[0]).charge #if there are less than 8 neighbors, node is at boundary #add a max charge to sum for each boundary node sumVal = sumVal + maxCharge * (nNeighbors - len(node.adjs)) node.charge = sumVal / nNeighbors node.error = abs(node.charge - tmp) else: node.error = 0 maxErr = max([node.error for node in self.graph.nodeList]) iteration = iteration + 1 print "iterations to convergence: " + str(iteration) #visualization of the field: #for node in self.graph.nodeList: # print "charge: " + str(node.charge) fieldList = [node.nid for node in self.graph.nodeList] self.DrawList(fieldList, clear = False) #build the path from start by moving to node with biggest potential difference #self.path = [] #empty list of node ids #self.path.append(start) #currentNode = self.graph.getNode(start) #while currentNode.nid != end: # nebGrads = {} # for neb in currentNode.adjs: # nebGrads[neb[0]] = currentNode.charge - self.graph.getNode(neb[0]).charge # currentNode = self.graph.getNode(max(nebGrads, key = nebGrads.get)) #move to node with highest positive difference # self.path.append(currentNode.nid) return class RRT2(Pathfind): label = "RRT ala Dave" def FindPath(self,start,end): begin = time() #initializations #create two trees which begin at start and end, respectively #trees are dictionaries which track nodes and the previous node in the tree startTree = {} endTree = {} startTree[start]="" endTree[end]="" #Build list of node IDs in map, shuffle it randomly randList = [node.nid for node in self.graph.nodeList] shuffle(randList) randList.remove(start) randList.remove(end) #subroutine "Extend": #1. Construct a dict of all nodes in someTree and their euclidean distance to someNode #2. Finds the nearest node in a tree to a given node by euclidean distance #3. Adds new nodes to tree while sequentially moving towards random node #4. Each time a node is added, it is removed from the primary nodelist #5. If an obstacle is reached, the branch ends, else it ends when goal is reached def Extend(someTree, someNode, otherTree): distances = dict((node, euclidDist(node, someNode, self.graph)) for node in someTree) nearestNode = min(distances, key = distances.get) newNode = 0 Trapped = False while newNode != someNode: meetingNode = FindIntersection(someTree, otherTree) newNode, isTrapped = Step(nearestNode, someNode) #if we can't go any further, stop extending if isTrapped == True: return meetingNode someTree[newNode] = nearestNode #this if/else is necessary when the two trees meet and nodes were popped elsewhere if newNode in randList: randList.remove(newNode)#remove newNode from randList else: return meetingNode nearestNode = newNode return meetingNode #subroutine "Step": #1. find all of the adjacents to redNode #2. return the neighbor with the minimum distance to blueNode (along trajectory) def Step(redNode, blueNode): neighbors = self.graph.getAdjacencies(redNode) nebDists = dict((neighbor[0], euclidDist(neighbor[0], blueNode, self.graph)) for neighbor in neighbors) minNeb = min(nebDists, key = nebDists.get) #check if redNode is "trapped" (i.e. can't extend any further towards goal) #define being trapped as the distance from a node is less than the distance from any neighbor if euclidDist(redNode, blueNode, self.graph) < nebDists[minNeb]: Trapped = True else: Trapped = False return minNeb, Trapped #subroutine "CheckIntersection" #1. compare the keys in two dictionaries #2 if a key exists in both dicts, return it, else return 0 def FindIntersection(redDict, blueDict): for key in redDict: if key in blueDict: print 'trees intersect!' return key return 0 #subroutine "BuildPath" #1. constructs a path from start to end which meets at a node def BuildPath(someNode, redTree, blueTree): self.path=[] startPath = [] endPath = [] #find the path from intersection back to start nodeID = someNode while nodeID != start: startPath.append(nodeID) nodeID = startTree[nodeID]#moves one node back in path startPath.reverse() #reverse path so it begins at start startPath.pop() #removes meetingNode from startPath (will be included in endPath) #find the path from intersection to end nodeID = someNode while nodeID != end: endPath.append(nodeID) nodeID = endTree[nodeID]#moves one node back in path self.path = startPath + endPath return #Main algorithm: #1. pop a node off of the randomized node list #2. extend each tree towards that node #3. check for tree intersection #4. lather, rinse, repeat while randList: randomNode = randList.pop() meetCheck1 = Extend(startTree, randomNode, endTree) meetCheck2 = Extend(endTree, randomNode, startTree) self.DrawList(startTree.keys(), clear = False, color='green') self.DrawList(endTree.keys(), clear = False, color = 'red') if meetCheck1 > 0: elapsed = (time() - begin) print "seconds to find path: " + str(elapsed) BuildPath(meetCheck1, startTree, endTree) return elif meetCheck2>0: elapsed = (time() - begin) print "seconds to find path: " + str(elapsed) BuildPath(meetCheck2, startTree, endTree) return #if all nodes have been explored and there are no intersections, no path exists self.path = None return # Node Plot: Plots the ID of all the nodes in the graph # 1. gets the list of all nodes from the map graph # 2. prints those nodes to the screen # 3. prints adjacent nodes and distances class NodePlot(Pathfind): label = "Node Plot" def FindPath(self, dummy1, dummy2): for i in self.graph.nodeList: print "Node Number: " + str(i.nid) print "Node Location: " + i.coordinateString() print "List of Adjacent Nodes:" for j in i.adjs: print "Node: " + str(j[0]) + ", Distance of: " + str(j[1]) class Dijkstra(Pathfind): label = "Dijkstra" def FindPath(self, start, end): begin = time() #function to pull a subdictionary from a larger dictionary...thank you stackoverflow def sub_dict(somedict, somekeys, default=None): return dict([(k, somedict.get(k,default)) for k in somekeys ]) ##########initialization############# #1.build an empty dictionary of node IDs (keys) and stored distances #2.build an empty dictionary of last nodeIDs through minimum path (current node is dict key) distance = {} last = {} #3.set all nodal distances to infinity and dictionary of last IDs to empty for node in self.graph.nodeList: distance[node.nid] = float('inf') last[node.nid] = "" #4.set start node distance to 0 distance[start] = 0 #5.create a list of unvisited nodes (all nodes) and visited node IDs (no nodes) unvisited = [node.nid for node in self.graph.nodeList] visited =[] ############main algorithm############### while unvisited: #build a subdictionary of unvisited distances and their nodes unDist=sub_dict(distance,[nodeID for nodeID in unvisited]) #find node with minimum distance in subdictionary minID = min(unDist,key=unDist.get) minDist = unDist[minID] #remove minimum node from unvisited list and add to visited list unvisited.remove(minID) visited.append(minID) #find all neighbors of min nod and compare new distance to stored #update as necessary with the minimum for neighbor in self.graph.getNode(minID).adjs: if distance[neighbor[0]] > distance[minID]+neighbor[1]: distance[neighbor[0]] = distance[minID]+neighbor[1] last[neighbor[0]] = minID #draw last 20 visited nodes draw=[nodeID for nodeID in visited[-20:]] self.DrawList(draw,width=2,height=2) #once the end is reached, build the optimal path using minimum path list ("last") if minID == end: elapsed = (time() - begin) print "seconds to find goal: " + str(elapsed) self.path = [] nodeID = end while nodeID != start: self.path.append(nodeID) nodeID = last[nodeID] #move to next neighbor self.path.reverse() return class Astar2(Pathfind): label = "AStar Ala Dave" def FindPath(self, start, end): begin = time() #function to pull a subdictionary from a larger dictionary...thank you stackoverflow def sub_dict(somedict, somekeys, default=None): return dict([(k, somedict.get(k,default)) for k in somekeys ]) ##########initialization############# #1.build an empty dictionary of lists of distances from start and euclidean to end, with node IDs as keys #2.build an empty dictionary of last nodeIDs through minimum path (current node is dict key) distance = {} last = {} #3.set all nodal distances to infinity and dictionary of last IDs to empty for node in self.graph.nodeList: distance[node.nid] = [float('inf'), euclidDist(node.nid, end,self.graph)] last[node.nid] = "" #4.set start node distance to 0, euclidean distance to whatever it is distance[start] = [0,euclidDist(node.nid, end, self.graph)] #5.create a list of unvisited nodes (all nodes) and visited node IDs (no nodes) unvisited = [node.nid for node in self.graph.nodeList] visited =[] #######main algorithm############## while unvisited: #build a subdictionary of unvisited distances and their nodes unDist=sub_dict(distance,[nodeID for nodeID in unvisited]) #find minimum combined distance in subdictionary (sum of distance from start and euclid distance to end) minID = min(unDist,key=lambda i: sum(unDist[i])) minDist = unDist[minID] #remove min node from unvisited, add to visited unvisited.remove(minID) visited.append(minID) #compare minimum node's distance + distance to vertex with stored neighbor's distance from start for neighbor in self.graph.getNode(minID).adjs: if distance[neighbor[0]][0] > distance[minID][0]+neighbor[1]: distance[neighbor[0]][0] = distance[minID][0]+neighbor[1] last[neighbor[0]] = minID #draw last 20 visited nodes draw=[nodeID for nodeID in visited[-20:]] self.DrawList(draw,width=2,height=2) #once the end is reached, build the optimal path using minimum path list ("last") if minID == end: elapsed = (time() - begin) print str(elapsed) + "seconds to find goal" self.path = [] nodeID = end while nodeID != start: self.path.append(nodeID) nodeID = last[nodeID] self.path.reverse() return # Iterative Deepening Depth-First Search. A very slow algorithm, this is not advised # to be run on any large graph. # http://en.wikipedia.org/wiki/Iterative_deepening_depth-first_search class DepthFirst(Pathfind): label = "Depth-First" # Get all the vertices adjacent to the specified node ID def getAdj(self, node): tovisit = [] for n in self.graph.getAdjacencies(node): tovisit.append(n[0]) return tovisit # Do a depth-first search from node to goal, moving at most depth steps. def DepthFirst(self, node, goal, depth): if (depth == 0 and node == goal): return True elif (depth > 0): for nbr in self.getAdj(node): self.path.append(nbr) self.DrawList(self.path, width=1, height=1) #Draw the path found so far. if self.DepthFirst(nbr, goal, depth-1): return True self.path.pop() return False else: return False # Find a shortest path from node id sn to node id en by putting an increasing maximum distance # to do a depth-first search on. def FindPath(self, sn, en): self.path = [sn] for maxDepth in range(int(euclidDist(sn, en, self.graph)+ 0.5)/2, len(self.graph.nodeList)+1): if self.DepthFirst(sn, en, maxDepth): return self.path = None # A-Star pathfinding algorithm. # A greedy algorithm that uses an admissable heuristic, so it finds a guaranteed shortest path. # http://en.wikipedia.org/wiki/A-star class AStar(Pathfind): label = "A-Star" # Find the shortest path between the node id start and the node id goal, using A-Star. def FindPath(self, start, goal): begin = time() # Need an array keeping track of the cost to get to a node, # and what node it came from # This means that we can overwrite particular entries, # and do this search iteratively # So, we make a new array of tuples, # containing the previous nid and the cost to get there # List(costTo, prev) # If unself.visited, is None self.visited = [] if self.graph != None: for i in range(self.graph.getNodeCount()): self.visited.append(None) # The toVisit array contains a list of lists, containing the node, # where it came from, and cost. # Priority queue is based on this cost. # Maybe split up cost into to and from. # PriorityQueue toVisit = PriorityQueue() # Start of the algorithm self.visited[start] = (0, None) toVisit.put( (0 + euclidDist(start, goal, self.graph), start, None) ) draw = deque([]) while not toVisit.empty(): # Start of the main loop nextTuple = toVisit.get() #Get the next node next = nextTuple[1] nextDist = self.visited[next][0] #Total distance travelled to get to here. #Use this instead of the toVisit, as that is estimated dist. to goal # The nature of A-Star makes it difficult to determine what the current idea of what the best path is. # Instead of trying to do that, we draw the last 50 nodes explored. # original drawing code if len(draw) >= 50: draw.popleft() draw.append(next) self.DrawList(draw, width=2, height=2) if next == goal: #Success! elapsed = (time() - begin) print str(elapsed) + "seconds to find goal" lastPathNode = goal # Construct the path list self.pathCost = self.visited[goal][0] # 0 denotes cost to get to this node self.path = [] while self.visited[lastPathNode][1] != None: self.path.append(lastPathNode) lastPathNode = self.visited[lastPathNode][1] # 1 denotes the node we just came from self.path.append(start) self.path.reverse() return for adjacency in self.graph.getAdjacencies(next): # For each neightbor, see if have a path to nextNbr, cost = adjacency[0], adjacency[1] totalTravel = nextDist + cost if self.visited[nextNbr] == None: # The neighbor is unvisited self.visited[nextNbr] = ( totalTravel, next ) # Mark the node as visited goalDist = euclidDist(nextNbr, goal, self.graph) # Estimated distance (euclidean) toVisit.put( (totalTravel + goalDist, nextNbr, next) ) # Add node to priority queue elif totalTravel < self.visited[nextNbr][0]: # The neighbor has been visited, but that is shorter # if we've found a shorter route to nextNbr self.visited[nextNbr] = (totalTravel, next) # then replace old route to nextNbr with new goalDist = euclidDist(nextNbr, goal, self.graph) toVisit.put( (totalTravel + goalDist, nextNbr, next) ) #else: Node has been visited, but current path is equal or longer self.path = None return # A pathfinding algorithm that utilizes Rapidly-exploring Random Trees for pathfinding. # Trees are rapidly grown from the start and end nodes, and once they meet, a path is created # Using the branches of the trees created. # NOT guaranteed to be optimal (In fact, most often isn't) # http://en.wikipedia.org/wiki/Rapidly-exploring_random_tree # http://www.kuffner.org/james/plan/algorithm.php class RRT(Pathfind): label = "RRT" # Finds a path between the node id startID and the node id goalID by growing two trees, one rooted # at startID, the other at goalID def FindPath(self, startID, goalID): # Randomize the list of nodes, so we can iterate over it to get random nodes. nodeList = self.graph.getNodes()[:] nodeCount = len(nodeList) shuffle(nodeList) # Store the RRTs as dictionaries, mapping node ids to the node they came from. Separate dictionaries, # as there may be overlap, and each node points in a different direction. # JRS: Hmm, possible optimization? If we just had one dict, then when we looked up the # nearest neb and found it was in the other tree, maybe we could make a connection? # Would probably need to add a "color" variable to the tree...? startPathDictionary = {startID : None} goalPathDictionary = {goalID : None} # We swap the "primary" tree each pass currPathDict = startPathDictionary otherPathDict = goalPathDictionary # Determines which dictionary is which tree, for coloring purposes. Negated every pass. dictOneCurr = True # Get the nearest neighbor in the given dictionary to the nodeID specified. def NearestNbr(dictionary, nodeID): nearest = None dist = -1 for n in dictionary.keys(): d = euclidDist(n,nodeID, self.graph) if d < dist or dist < 0: dist = d nearest = n return nearest # Keep getting random nodes; for each one, grow both trees toward that randomly chosen node for i in range(nodeCount): qRand = nodeList[i] # Create a trajectory between the random node and the nearest neighbor, and traverse it as far as possible. qNbr = NearestNbr(currPathDict, qRand.nid) traj = PathGraph.Trajectory(self.graph, trajectory=[qNbr, qRand.nid]) traj.ConvertToPath() #rndtargnode = self.graph.getNode(qRand.nid)#jrs #print "Random node (end target): ",rndtargnode.getCoordinates(), " ", #jrs #qNbrnode = self.graph.getNode(qNbr)#jrs #print "Nearest nbr on tree (start node of extension): ", qNbrnode.getCoordinates() #jrs #print "Traj: ", #jrs #for anodeID in traj.path: #jrs # anode = self.graph.getNode(anodeID) # jrs # print anode.getCoordinates(), " " #jrs # Draw the new part of the RRT if dictOneCurr: self.DrawList(traj.path, clear=False, color='green') else: self.DrawList(traj.path, clear=False, color='red') # Add the part of the trajectory traversed to the main dictionary for index in range(1, len(traj.path)): nodeID = traj.path[index] currPathDict[nodeID] = traj.path[index-1] # Create a trajectory between the random node and the nearest neightbor in the OTHER dictionary, and traverse it as far as possible. qNbrOther = NearestNbr(otherPathDict, qRand.nid) trajOther = PathGraph.Trajectory(self.graph, trajectory=[qNbrOther, qRand.nid]) trajOther.ConvertToPath() # Draw the new part of the RRT if dictOneCurr: self.DrawList(trajOther.path, clear=False, color='red') else: self.DrawList(trajOther.path, clear=False, color='green') # Add the part of the trajectory traversed to the other dictionary for index in range(1, len(trajOther.path)): nodeID = trajOther.path[index] otherPathDict[nodeID] = trajOther.path[index-1] # If both trajectories reach the goal, then the trees intersect. if traj.completePath and trajOther.completePath: currNodeID = traj.path[-1] self.path = deque([currNodeID]) # Add the path from the start node dictionary while (currNodeID != startID): currNodeID = startPathDictionary[currNodeID] self.path.appendleft(currNodeID) currNodeID = traj.path[-1] # Add the path from the goal node dictionary while (currNodeID != goalID): currNodeID = goalPathDictionary[currNodeID] self.path.append(currNodeID) self.path = list(self.path) return else: # trees have not yet reached same randomly chosen node # Do the swaps! dictSwap = currPathDict currPathDict = otherPathDict otherPathDict = dictSwap dictOneCurr = not dictOneCurr #raw_input('Press the any key to continue! ') #jrs self.path = None