""" BlimpBot interface """ # PORTS_TO_TRY contains the list of ports the BlimpBot interface will # attempt to connect to. Each number n corresponds to port COM(n+1), # so 2 means COM3, 3 means COM4, etc. PORTS_TO_TRY = (2,3,4,5) import sys, os, serial, time, math from threading import Thread, Lock from Queue import Queue, Empty time.clock() # make sure clock is counting LED_RED = 4 LED_GREEN = 0 LED_OFF = 0 LED_ON = 2 LED_TOGGLE = 1 LED_MASK = 8 FAN_VERTICAL = 1 FAN_RIGHT = 2 FAN_LEFT = 3 FANS = [FAN_VERTICAL, FAN_LEFT, FAN_RIGHT] FAN_SPEED_SIGN_BIT = 1 << 5 FAN_SPEED_MASK = 0b111111 TIMEOUT_SECONDS = .1 SERIAL_TIMEOUT = .01 # TODO: decrease # Height control # Sensor reading for default height TARGET_HEIGHT = 12. # These coefficients are multiplied by the difference # between the measured sensor value and the target value # to determine the vertical motor speed. BELOW_HEIGHT_COEFF = 1 ABOVE_HEIGHT_COEFF = .5 # These coefficients are multiplied by the difference between # the measured sensor velocity and the target velocity (zero), # to further adjust the vertical motor speed. BELOW_VEL_COEFF = 1 ABOVE_VEL_COEFF = .5 # Weights of the most recently read position and velocity values. # The higher the weights, the more impact the most recent values # have on the averaged position and velocity values used for # height control. NEW_POSITION_WEIGHT = .3 NEW_VELOCITY_WEIGHT = .3 HEIGHT_CONTROLLER_P = .5 HEIGHT_CONTROLLER_I = .5 HEIGHT_CONTROLLER_D = .5 class BlimpBotFan(object): """ Manages the speed of a single fan. """ def __init__(self, fan, interface): self.fan = fan # the fan to use self.vel = 0 # fan's current velocity (0 to 63) self.desired_vel = 0 # the velocity we want (0 to 63) self.interface = interface # BlimpBot command interface to use interface.add_handler(self) self.packet_sent = time.clock() # time in seconds when the last packet was sent. self.lock = Lock() # prevent the wxPython thread and serial thread # from accessing at the same time. self.is_on = False def set_vel(self, vel): self.lock.acquire() speed = int(vel * 31) if speed < -31: speed = -31 elif speed > 31: speed = 31 speed = speed & FAN_SPEED_MASK #print "Speed: %3d == %10s" % (speed, bin(speed)) self.desired_vel = speed self.lock.release() def on_response(self, response): """ Called when a response packet is received. """ # Ignore response if it's not for us #if response[0] >> 5 != 5 or response & 3 != self.fan: return #print "Received response in fan %d" % self.fan def on_update(self): """ Called when no response packets are received for a brief time period. We'll check for timeout, and re-send if necessary. """ #print "Updating fan %d" % self.fan self.lock.acquire() if time.clock() - self.packet_sent > TIMEOUT_SECONDS: #print "Resending..." self.send_command() self.lock.release() def send_command(self): if not self.is_on: return #print "Sending command from fan %d..." % self.fan self.interface.send_command((self.fan << 6) | self.desired_vel) self.packet_sent = time.clock() def turn_on(self): self.is_on = True def turn_off(self): print "Turned off" self.is_on = False class BlimpBotIRSensors(object): def __init__(self, interface): self.interface = interface interface.add_handler(self) self.values = [0] * 5 self.vals_received = 0 self.lock = Lock() self.callback = None self.height_control_callback = None def set_height_control_callback(self, callback): self.height_control_callback = callback def turn_on(self): self.lock.acquire() #print "Sending command for sensors..." self.vals_received = 0 self.interface.send_command(63) self.lock.release() def turn_off(self): self.interface.send_command(1 << 5) def on_update(self): pass def on_response(self, response): """ Called when a response packet is received. """ # Ignore response if it's not for us if response[0] >> 6 != 1: return self.lock.acquire() response_byte = 1 try: for i in xrange(5): if not (response[0] & (1 << i)): continue self.values[i] = response[response_byte] + (response[response_byte + 1] << 8) response_byte += 2 if self.callback is not None: self.callback(i, self.values[i]) # Adjust bottom motor to do height control if i == 4 and self.height_control_callback: self.height_control_callback(self.values[i]) except Exception as e: print "Error in IR updating: %s" % e self.lock.release() class BlimpBotInterface(object): """ Serial interface to the BlimpBot. """ def __init__(self): """Initializes the interface.""" self.connect() self.handlers = [] poll_thread = Thread(target=self.poll_serial_port) poll_thread.daemon = True poll_thread.start() def connect(self): """Connects to the first available COM port.""" ports = list(PORTS_TO_TRY) while True: try: if not ports: print "No ports available." port = ports.pop(0) self.ser = serial.Serial(port) print "***** Successfully connected to COM%d *****" % (port + 1) print "=" * 80 break except Exception as ex: print "Failed to connect to serial port COM%d\nDetails:\n%s" % (port + 1, ex) if ports: print "***** Let's try another port! *****" else: print "=" * 80 print "No more ports left to try. Make sure RealTerm is not using the" print "port! If that doesn't work, please unplug the USB cable from the" print "computer, close Code Composer, close RealTerm, plug the cable in," print "then try again." # TODO: exit more nicely sys.exit(1) print "-" * 80 def poll_serial_port(self): data_block = [] # True if the last data byte we read was the escape code (254, or \xFE) has_read_escape_code = False try: while True: #try: data_str = self.ser.read(1) if data_str: for data in data_str: #print "Received %d" % ord(data) if data == "\xFF": print "Received block %s" % data_block for handler in self.handlers: handler.on_response(data_block) data_block = [] has_read_escape_code = False elif data == "\xFE": has_read_escape_code = True else: if has_read_escape_code: if ord(data) == 0: data_block.append(254) elif ord(data) == 1: data_block.append(255) else: data_block.append(ord(data)) has_read_escape_code = False for handler in self.handlers: #print "Updating..." handler.on_update() # TODO: handle exceptions better # TODO: do more than just print the returned data # TODO: have another thread handle timed-out transmissions. #except Exception as e: # print "Error: %s" % e finally: self.ser.close() def send_command(self, cmd): """ Sends a single command. """ #print "Sending command %d" % cmd self.ser.write(self.encode_byte(chr(cmd)) + "\xFF") def send_long_command(self, cmd): #print "Sending command %s" % cmd """ Sends a string of commands. """ self.ser.write([self.encode_byte(x) for x in cmd] + "\xFF") def encode_byte(self, byte): if byte == "\xFF": return "\xFE\x01" elif byte == "\xFE": return "\xFE\x00" else: return byte def add_handler(self, h): self.handlers.append(h) class InterpolatingTable(object): """ Table that uses linear interpolation to map x to y. """ def __init__(self, data=None): self.pts = [] if data is None else data def add_pt(self, x, y): """ Adds a point to the linear interpolation table. """ if len(self.pts) == 0: self.pts.append((x, y)) return i for i in xrange(len(self.pts)): if x < pts[i]: self.pts.insert(i, x) return i self.pts.append((x, y)) return len(self.pts) - 1 def evaluate(self, my_x): """ Evaluates a point, returning a linearly-interpolated value for the nearest point in the look-up table. """ last_x = last_y = None for x, y in self.pts: if my_x < x: if last_x is None: return y elif my_x > last_x: return last_y + (my_x - last_x) * (y - last_y) / float(x - last_x) last_x = x last_y = y return last_y def get_pts(self): """ Returns the array of control points. """ return self.pts def remove_pt(self, index): """ Removes the indicated point from the table. """ del self.pts[index] class HeightController(object): def __init__(self, ir_sensors, fan_control_func): ir_sensors.set_height_control_callback(self.height_control_callback) self.fan_control_func = fan_control_func self.use_height_control = False self.position = self.target_position = TARGET_HEIGHT self.velocity = 0 self.last_time = time.clock() self.interp_table = None # These are functions that users of the HeightController, # like the GUI, can set. # This one is called when the PID controller changes the value. self.callback = None # This one is called when the filtered height is updated from # a sensor value. self.height_callback = None # PID coefficients self.P = HEIGHT_CONTROLLER_P self.I = HEIGHT_CONTROLLER_I self.D = HEIGHT_CONTROLLER_D self.filter_weight = NEW_POSITION_WEIGHT self.stddev = 10 # Start off with no error self.error_integral = 0 # When height control is enabled, we'll log the PID values here. self.pid_log = None def set_interp_table(self, t): self.interp_table = t def set_callback(self, callback): """ Sets the optional callback to call when the height controller sets the fan. """ self.callback = callback def set_height_callback(self, callback): """ Sets the optional callback to call when the height controller gets a new averaged height. """ self.height_callback = callback def height_control_callback(self, position): """ This function is called when a height sensor reading is read. It uses a PID controller to make the blimp's position match the position set point, which is set when the height control is enabled. """ # Determine new position, using a weighted average to dampen any wild, # transient values. old_position = self.position if self.interp_table is not None: position = self.interp_table.evaluate(position) filter_weight = self.filter_weight * math.exp(-(position - old_position)**2/(2*self.stddev**2)) new_position = position * filter_weight + \ old_position * (1 - filter_weight) self.position = new_position if self.height_callback: self.height_callback(new_position) # Skip the rest of the code if we're not controlling the height. if not self.use_height_control: return # PID controller ----------------------------------------------- print "P: %.2f, I: %.2f, D: %.2f, filter: %.2f, stddev: %.2f" % \ (self.P, self.I, self.D, filter_weight, self.stddev) # Get the error term for PID calculations error = new_position - self.target_position old_error = old_position - self.target_position # Find out how much time has passed, then reset our clock cur_time = time.clock() delta_t = cur_time - self.last_time self.last_time = cur_time # Estimate the integral by adding the area of the *trapezoid* # for the error terms. self.error_integral = (self.error_integral * .25 + ((error + old_error) / 2) * delta_t) if self.error_integral < -500: self.error_integral = -500 elif self.error_integral > 500: self.error_integral = 500 # Estimate the derivative term error_derivative = (error - old_error) / delta_t # Now, calculate the fan velocity using the coefficients fan_vel = error * self.P + \ self.error_integral * self.I + \ error_derivative * self.D # Set the fan velocity by calling the fan control function given in the # constructor. self.fan_control_func(fan_vel) # Log the PID values self.pid_log.write(','.join(["%f" % x for x in cur_time, position, new_position, error, self.error_integral, error_derivative, self.P * error, self.I * self.error_integral, self.D * error_derivative, fan_vel ]) + '\n') # The GUI might give this object a callback, so it can report what values # it's getting. if self.callback: self.callback( error = error, derivative = error_derivative, integral = self.error_integral, time = self.last_time, delta_t = delta_t ) def start_height_control(self): self.use_height_control = True self.target_position = self.position # Log the PID values self.pid_log = open(os.path.join(os.path.dirname(__file__), 'pid_log.csv'), 'w') self.pid_log.write("Time (seconds),Sensed Height,Filtered Height,Error,Integral,Derivative,Error * P,Integral * I, Derivative * D, Fan Velocity\n") def stop_height_control(self): self.use_height_control = False def is_active(self): return self.use_height_control def load_config(self, dict): """ Reads BlimpBot settings from a dictionary. """ self.P = float(dict.get('Proportional_Constant', self.P)) self.I = float(dict.get('Integral_Constant', self.I)) self.D = float(dict.get('Derivative_Constant', self.D)) self.stddev = float(dict.get('StdDev', self.stddev)) self.filter_weight = float(dict.get('Position_Filter_Constant', self.filter_weight)) interp_table_pts = dict.get('Height_Lookup_Table', None) if interp_table_pts: interp_table_pts = [map(float, x.split(',')) for x in interp_table_pts.split('|')] self.interp_table = InterpolatingTable(interp_table_pts) def save_config(self, dict): """ Writes BlimpBot settings to a dictionary. """ dict['Proportional_Constant' ] = self.P dict['Integral_Constant' ] = self.I dict['Derivative_Constant' ] = self.D dict['StdDev' ] = self.stddev dict['Position_Filter_Constant'] = self.filter_weight if self.interp_table: dict['Height_Lookup_Table'] = '|'.join([','.join(map(str,x)) for x in self.interp_table.get_pts()]) class BlimpBot(object): def __init__(self, interface=None): if interface: self.interface = interface else: self.interface = BlimpBotInterface() self.fans = {} for fan in FANS: self.fans[fan] = BlimpBotFan(fan, self.interface) self.ir_sensors = BlimpBotIRSensors(self.interface) self.height_controller = HeightController(self.ir_sensors,\ self._set_vertical_fan_vel) def _set_vertical_fan_vel(self, vel): self._set_fan_vel(FAN_VERTICAL, vel) def set_fan_vel(self, fan, vel): """ Sets the velocity of the given fan. - fan can be FAN_VERTICAL, FAN_LEFT, or FAN_RIGHT, - vel is between -1.0 and 1.0 inclusive. """ if fan == FAN_VERTICAL and self.height_controller.is_active(): return self._set_fan_vel(fan, vel) def _set_fan_vel(self, fan, vel): self.fans[fan].set_vel(vel) def set_led(self, led, state): self.interface.send_command(LED_MASK | LED_OFF | LED_TOGGLE) def set_ir_callback(self, callback): self.ir_sensors.callback = callback def start_height_control(self): self.height_controller.start_height_control() def stop_height_control(self): self.height_controller.stop_height_control() def load_config(self, dict): """ Reads BlimpBot settings from a dictionary. """ self.height_controller.load_config(dict) def save_config(self, dict): """ Writes BlimpBot settings to a dictionary. """ self.height_controller.save_config(dict) def start(self): self.is_on = True print "Starting" self.interface.send_command(1) for fan in self.fans.itervalues(): fan.turn_on() self.ir_sensors.turn_on() def stop(self): # TODO: make this send an actual stop command print "Stopping" for fan in self.fans.itervalues(): fan.turn_off() self.ir_sensors.turn_off() self.is_on = False self.interface.send_command(0)