# Physical -- Routines for implementing fishnet on a physical network # most of the code is shared in common between the ipaq and # the emulation environment # # Differences (relating to the fact that the emulator gets the topology # from the trawler, and the ipaq gets the topology from broadcasting # to its neighbors): # # an emulated node gets its fishAddress from the trawler; on the ipaq # the user provides the (unique) fishAddress (see back of the ipaq) # before sending a packet, emulator should see if there are any # arp updates from trawler # if packet is to be broadcast, ipaq can broadcast; emulator should # unicast to all neighbors in the arp cache # if direct send, when arp lookup fails, emulator should print an error, while # ipaq should broadcast # # For now, we ignore link characteristics (loss, delay, bw) in emulation mode; # we could get this info from the trawler, and enforce it here require 'socket' require 'rbconfig.rb' $WINDOWS = (Config::CONFIG['host'] =~ /mswin32/) module Fishnet # UDPBroadcast / UDPPort is only used by iPAQs $UDPBroadcast = "10.0.1.255" $UDPPort = 8888 # default local port $MaxByteRate = 100000 # don't send more than 100KB/s # Routines for managing an emulated or ipaq fishnet # Note: on the IPAQ, the arp cache is not a complete list of neighbors, just # those who have sent us a message. So you can't use it to answer assignment 1! # # We keep an event queue on the emulated node and on the IPAQ, to: # defer sendPkt if we have sent too many packets recently, to avoid flooding the network # allow keyboard input to be delayed a fixed amount of time # to remember to call Node::onTimer class Physical < Manager @@Version = 3 # version num for all packets sent between machines @@Header = "nnnn" # string for how we pack/unpack the header @@HeaderSize = 8 @@MaxPhysPacket = MaxPacketSize + @@HeaderSize # Constructor is called with the fishnet address of the ipaq. def initialize(fishAddress, localPort) super() @sockets = [] @myAddress = fishAddress # fishAddress must be unique @localPort = localPort # this node listens to localPort @nextSend = 0 # when we can next send a packet @cmds = Commands.new # initialize the command parser @events = SortedEvents.new # list of deferred events @arp = Hash.new # list of neighbors @socket = openbind(nil, localPort) #@amph = AmphibianMgr.new(@sockets) #@amph.start(localHost, localPort+1000) # XXX @node = Node.new(self, fishAddress) # create user code #@node.amphibian = @amph #@amph.node = @node end attr_reader :node # for Command:processLine attr_reader :amph def openbind(addr, port) addr = 'localhost' unless addr log Debug::ALL, "Binding to #{addr}:#{port}." s = UDPSocket.open s.bind(addr, port) @sockets << s return s end # Start the node, then poll for incoming packets and keyboard commands def start() @now = fishTime @node.start defer = nil # make a list of sockets/files to select() on. # select() on stdin doesn't work on windows. @sockets << $stdin unless $WINDOWS socketlist = @sockets.map { |s| if s.respond_to?(:addr) a = s.addr "#{a[2]}:#{a[1]}" elsif s == $stdin "stdin" else s.inspect end }.join(", ") # log "Waiting for input on [" + socketlist + "]." loop do @now = fishTime if $fishInput if $fishInput.eof? $fishInput = nil elsif (!defer or defer <= @now) # process keyboard input defer = @cmds.processLine($fishInput.gets, @now) @events.push(defer, proc {}) if defer end end while @events.first and @events.first.timeToOccur <= @now # process a deferred event (such as a delayed send or wakeup) @events.shift.upcall.call end # process one pending incoming message. # it'd be nice to process them all, but if we get behind in # the event queue everything goes crazy. # (like: packets start sending out of order.) timeToWait = !@events.first ? nil : (@events.first.timeToOccur - @now)/1_000_000.0 if $WINDOWS timeToWait = $MaxWait if !timeToWait or timeToWait > $MaxWait end if ready = peek(@sockets, timeToWait) ready.each { |rin| if rin == $stdin @cmds.processLine($stdin.gets, fishTime) #elsif @amph.process(rin) # ... else # a fish socket. pkt, tuple = rin.recvfrom(@@MaxPhysPacket) inet, port, name, ipAddr = tuple rcvPkt(pkt, [ipAddr, port]) end } end end end # send a packet to its destination. If broadcast, send it to all neighbors. def sendPkt(from, to, pkt) super(to, pkt) # check that its legal refreshArp physPkt = [@@Version, to, @myAddress, 0].pack(@@Header) + pkt if (to == BroadcastAddress) then broadcastPkt(physPkt) elsif (ipInfo = @arp[to]) then physicalSend(physPkt, ipInfo) else noArp(to, physPkt) end end # unpack an incoming UDP packet, and deliver it to the application. # Note that we discard any packet that arrives, but isn't for us; # this can occur if we start listening on a UDP port that was recently # used by some other node. In the wireless case, everyone hears # all packets, and discards those that aren't for them. # # NOTE: since trawler tells us the arp cache contents, # we could add something that the trawler just told us to delete. # def rcvPkt(udpPkt, ipInfo) #log "Received msg from #{ipInfo[0]}:#{ipInfo[1]}" version, dest, from, tmp = udpPkt.unpack(@@Header) pkt = udpPkt[@@HeaderSize..udpPkt.length] if version != @@Version or !Fishnet::address?(from) $stderr.print "Received odd packet ", udpPkt else @arp[from] = ipInfo # add source address to arp cache if (dest == @myAddress or # and check that packet is for us dest == BroadcastAddress) then @node.onReceive(from, pkt) end end end # called by node to register a wakeup alarm x milliseconds in the future # NOTE: fishTime reports in microseconds, so we need to normalize x def wakeupIn(x) super(x) @events.push(fishTime + x*1000, proc { @node.onTimer(fishTime/1000) }) end # Send a packet, but defer sending it if we have sent something else recently def physicalSend(pkt, ipInfo) #log "Sending pkt to #{ipInfo[0]}:#{ipInfo[1]}" currentTime = fishTime if @nextSend <= currentTime then @socket.send(pkt, 0, ipInfo[0], ipInfo[1]) @nextSend = currentTime else @events.push(@nextSend, proc { @socket.send(pkt, 0, ipInfo[0], ipInfo[1]) }) end # compute when we can send the next packet, if there is one @nextSend += pkt.length*1000000/$MaxByteRate end # called by Node to get the current time, in milliseconds def now fishTime/1000 end end # FishnetPhysical # specialized routines for the ipaq class Ipaq < Physical DefaultPort = 8888 # use UDPPort as the local port for sending/receiving messages # there is only one fishnet process running on each IPAQ, so ok to fix the port def initialize(fishAddress, localPort=nil) $UDPPort = localPort if localPort super(fishAddress, "10.0.1.#{fishAddress}", $UDPPort) openbind("10.0.1.255", @localPort) end # refreshArp is only relevant to the emulator def refreshArp() end def start @socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, 1) super end # On the IPAQ, this is easy! def broadcastPkt(pkt) physicalSend(pkt, [$UDPBroadcast, $UDPPort]) end # if destination for pkt is not in the ARP cache, cast a wide net! def noArp(to, pkt) broadcastPkt(pkt) end # play a sound file. For now, just a stub. def pong super end end # FishnetIpaq # specialized routines for the emulator. See comments at the top of this # file for how the emulation environment differs from the IPAQ environment. class Emulator < Physical # connect to the trawler, and tell them about us. # Get back our fishnet address in return. def initialize(trawlerName, trawlerPort, localPort) begin @trawler = TCPSocket.new(trawlerName, trawlerPort) raise ArgumentError if !@trawler @trawler.sync = true @trawler.puts localPort fishAddr = @trawler.gets.chomp rescue SystemCallError # we get here if the trawler isn't accepting requests, or if # it accepted our socket, but then closed it before we can write to it $stderr.puts "Unable to connect to the trawler " + "at #{trawlerName}:#{trawlerPort}." raise end if !fishAddr or !Fishnet::address?(fishAddr.to_i) log Debug::ALL, "Illegal fish address #{fishAddr} returned from trawler." raise ArgumentError elsif fishAddr.to_i == BroadcastAddress # trawler returns the broadcast address to signal # there's already someone using the localPort log Debug::ALL, "Port #{localPort} already in use: choose another." raise ArgumentError else # success! log Debug::ALL, "Got fishAddr #{fishAddr} from trawler." super(fishAddr.to_i, localPort) end end # RefreshArp -- Make sure our arp cache is up to date, by checking to see # if the trawler has given us any updates # note: if we lose our connection to the trawler, @trawler.gets will return nil def refreshArp() while peek([@trawler], 0) and (str = @trawler.gets) do # log Debug::ALL, "Got arp from trawler #{str}" case str when /\A\s*reset/ @arp.clear when /\A\s*add:\s*(\d+)\s*(\d+\.\d+\.\d+\.\d+):(\d+)/ if Fishnet::address?(fishAddr = $1.to_i) @arp[fishAddr] = [$2, $3.to_i] end when /\A\s*remove:\s*(\d+)\s*/ @arp.delete $1 else $stderr.print "Unrecognized command from trawler\n" end end end # if packet is to be broadcast, need to send it once directly to each neighbor # in the ARP cache def broadcastPkt(pkt) @arp.each { |fishAddr, ipInfo| physicalSend(pkt, ipInfo) } end # if destination is not in the ARP cache, error message def noArp(dest, pkt) log Debug::PHYSICAL, "Not a neighbor to destination ", dest end end end # module Fishnet # vim: set ts=2 sw=2 noet :