///////////////// The Session Layer                                                                      /////////////////

///////////////// Session Data Structures   /////////////////

            // queue ID’s for communication between session and data-link

            #define SELF 1     // different for each processor

 

            struct shared_mem {

                        char value;

                        int policy;

                        int id;

                        boolean postOK;

                        int publisher;

                        list *subscribers;

                            boolean update;   // flag for use in copy-on-read

                        queue waiting; // for use with semaphore

}

shared_mem globals[N];

 

session_init() {

            g = 0;  // number of currently known shared globals

          }

///////////////// Shows how one Session Layer talks to another Session Layer //////////

///// Outgoing Messages //////////////////////           

            int publish(int dataID, int policy) { //msg = <PUB, publisher, data ID, policy>, broadcast

                        message[0] = PUBLISH;   // Message Type

                        message[1] = SELF;        // ID of Publisher

                        message[2] = dataID;       // ‘Address’ of the shared variable

                        message[3] = policy;        // update policy …

                        transport_send(0,message, 4);   // broadcast this message (can this block?)

                        return(0);

            }                      

           

            int subscribe(int dataID, int policy) { //msg = <SUB, subscriber, data ID, policy>, to publisher

                        startTimer();

                        // block until published or timeout

                        while ( (!tmp=lookup(globals,dataID)) && readTimer() < TIMEOUT);

                        if (tmp) { // publisher found

                        message[0] = SUBSCRIBE;    // Message Type

     message[1] = SELF;              //  I am the subscriber

                        message[2] = dataID;

                        message[3] = policy;              // subscription update policy

                        transport_send(tmp.publisher, message, 4) ; // send message to publisher

                        return(0)

            }

            send_signal(NETWORKTIMEOUT);  // signal to all tasks on this processor for error handling

}

 

int post(int dataID, char value) {

    if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to post to unpublished data item

    if (tmp.publisher == SELF) {

            tmp.value = value;     

            if (tmp.policy == CR) return(0);  // no need to do anything!

            else  {    // copy on write policy, need to inform subscribers

                foreach(p in tmp.subscribers) {

                    message[0] = POST;

                    message[1] = dataID;

                    message[2] = SELF;   // so that receiver can verify!

                    message[3] = value;  // send the data

                    transport_send(p, message, 4); // send post message to p

            }

     } // I am not publisher, I must be subscriber

      // send a post message...but don't update tmp.value. This can only happen as a result of receiving a POST.

     p = tmp.publisher

     message[0] = POST;

     message[1] = dataID;

     message[2] = SELF;

     message[3] = value;

     transport_send(p, message, 4);

     return 0;

}  

int update(int dataID, char *value) {

    if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to post to unpublished data item

    if  (SELF = tmp.publisher) *value = tmp.value;

    if (SELF not in tmp.subscribers) return(-ENOTSUBSCRIBER); 

    if (tmp.policy = CW) { 

        *value = tmp.value;

        return(0);

    }

    // its copy on read, so send a message!

   message[0] = UPDATE;

   message[1] = dataID;

   message[2] = SELF;   // so that receiver can verify

   transport_send(tmp.publisher, message, 3); // send post message to p

   tmp.update = false;

   sleep_on(tmp.queue, TIMEOUT);                    // blocking

   if (!tmp.update) return (-EREADTIMEOUT);

   else *value = tmp.value;

}

 

/* if self is publisher, then check semaphore and continue or addQ and block */

/*  if not, then send a WAIT message, addQ, and block; */

int wait(dataID) {

    // only support CR (actually no copy at all)

    if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to post to unpublished data item

    if (tmp.publisher == SELF) {

            if (tmp.value <= 0) {

                        sleep_on(tmp.queue);   

            }  

            else tmp.value--;

            return 0;

     }  else

            p = tmp.publisher;

            message[0] = WAIT;

            message[1] = dataID;

            message[2] = SELF;

            message[3] = value;

            transport_send(p, message, 4);

             sleep_on(tmp.queue);

    }

    return 0;

}

 

/* if SELF is publisher, then increment variable and get a waiting process off of the queue.

 * if the waiting process is SELF then wake self, otherwise, send a SIGNAL message to the waiting process 

 * If SELF is not publisher, then send a signal message to the publisher and continue */

int signal(dataID) {

    if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to signal to unpublished data item

    if (tmp.publisher == SELF) {

            tmp.value++;

            task = tmp.deqQ();

            if (task.getAddress() == SELF) {

                    wake(task);          

            }  else {

                p = tmp.publisher;

                message[0] = SIGNAL;

                message[1] = dataID;

                message[2] = SELF;

                transport_send(p, message, 3);

        }

    }

     return 0;

}

 

///// Incoming Session Layer Messages //////////////////////

int session_recv(char *message, int len) {  // called by data link layer when a message is received

            if (len < 4) session_error();

            switch (message[0])

                        SUBSCRIBE:

                                    if (!tmp = lookup(message[2]))  session_error(message); // not published

                                    if (tmp.publisher != SELF) session_error(message);       // by me

                                    ir (tmp.policy != message[3]) session_error(message);  // policy mismatch

                                    add_subscriber(tmp.subcribers, message[1]);       // add to subscribers list

                                    return(0);                                  

                        PUBLISH:

                                    if (tmp = lookup(message[2])) session_error(message); // already published

                                    tmp = globals[g++];                                // fill in data structure

                                                tmp.policy = message[3];

                                                tmp.id = message[2];    

                                                tmp.publisher = message[1];

                                                return(0);

                        POST:  

                                    if (!tmp = lookup(message[1]) session_error(message); // not published

                                    if (SELF in tmp.subscribers) { // message from SUBSCRIBER

                                         tmp.value = message[3];

                                        if (tmp.policy == CR) { // must be blocked

                                            wake_on(update_q); // wake up reader(s)

                                            tmp.update = true;

                                    } else 

                                     if (SELF == tmp.publisher) post(message[1], message[3]); //

                                     else session_error(message); // not publisher or subscriber!

                                     return(0);

                        UPDATE:

                                            if (!tmp = lookup(message[1]) session_error(message); // not published

                                      if (tmp.publisher != SELF) session_error(message); // by me

                                      if (message[2] not in tmp.subscribers) session_error(message); // not sub

                                      subscriber = message[2]; // save subscriber info

                                      message = make_message(POST, tmp.dataID, SELF, tmp.value);

                                      session_send(subscriber, message, 4);  return(0);

                        WAIT:      

                                    // if SELF is not publisher then error. Never send a WAIT message to subscriber!

                                    // otherwise, decrement and either send SIGNAL back or add subscriber to Q

                                    // publisher is already blocked on the other side, waiting for a signal message!

                                            if (!tmp = lookup(message[1]) session_error(message); // not published

                                     if (tmp.publisher != SELF) session_error(message); // by me

                                     if (tmp.value <= 0) addQ(tmp.queue, message[2]); // task address into wait queue but don't sleep!

                                     else {

                                            tmp.value--;

                                            make_message(SIGNAL, tmp.dataID, SELF);

                                            session_send(message[2], message, 3);

                                     } return(0);

                        SIGNAL:

                                    // if SELF, the publisher recvd signal. So increment count and deQ

                                    //      if deQ'd process is not SELF, send a signal command

                                    //      otherwise wake self

                                    // Otherwise, the publisher has sent a message to the subscriber

                                    //      subscriber should do a local deQ and wake.

                                          if (!tmp = lookup(message[1]) session_error(message); // not published

                                    if (tmp.publisher == SELF) {

                                            tmp.value++;

                                            task = tmp.deQ();  // returns a process info struct or just a machine ID

                                            if (task.getAddress() == SELF) wake(task);

                                            // I'm cheating a little, task is either a task struct (local) or a machine id (remote)

                                            else {

                                                make_message(SIGNAL, tmp.dataID, SELF);

                                                session_send(task, message, 3);

                                            }

                                     }

                                      else {

                                           task = tmp.deQ(); // wake any local process waiting on this queue

                                           wake(task);

                                     } return 0;

                                                        

                        default:  session_error(message); // have to deal with errors

}

 

////////////////// Data-link layer //////////////////////////

                                              

// keep a packet counter so that we can uniquely tag each packet for a period of time

transport_packet_counter = 0;

char *transport_packet;  // packet = <dst, src, number, checksum, length, session_msg>

          // or it can be <dst,src,number,0,0> = NACK

 

 

/////// Outgoing Transport Packets //////////

transport_send(int dst, char *session_msg, int length) {  // called from session layer with a  session message

packet = new transport_packet(session_msg, dst, length);

if (dst == SELF || dst == 0) transport_recv(packet);        // loopback in the event of SELF or Broadcast

if (dst != SELF) {                                                  // if not self, send to datalink layer

datalink_send(packet);

                        save_until _timeout(packet);

            }          

}

 

//////// Incoming Transport Packets ////////

transport_recv(char *packet, int length) { // called from datalink layer when a packet is received.

            if (isNACK(packet)) {  // receiver had a problem so retransmit if still saved

                        resend = get_saved _packet(get_packet_number(packet));  // lookup saved packet for resend

                        if (resend) datalink_send(resend);          

                        else transport_error(packet);                     // can no longer retransmit, so transport error has occurred

            } else {  // it must be normal packet

                        char *session_msg = transport_verify(packet, length); // validates header and checksum, extracts session message

                        If (session_msg)  // if null, then assume that verify failed so send a NACK

                        session_recv(session_msg);  // pass extracted message to session layer

else

                        datalink_send(make_NACK_packet(packet)); // send a NACK to the src of the packet.

                                                                                           // a NACK to a NACK should cause error, so don’t save

}

}

 

 

////////////////////////// Datalink Layer ////////////////////////

/////////// what transport needs to know about datalink ////////

datalink_send(char *packet) {

 

// here is the new version: a packet is converted into a set of frames appropriate for the datalink interface

// The frame format contains information needed to reconstruct the packet along with the dst address needed by i2C

 

            //  <packet> becomes   <dst, seq#, packet#, [part of packet]>  <dst, seq#, packet#, [part of packet]> ... <dst, seq#, packet#, [part of packet]> 

            // it should be something like this

            make_frames(packet); // chops packet up into frames and puts them on and internal queue.

            I2C_send_start();         // try to send

}

 

datalink_init() {

            master = false;  // we are not currently bus master

            recv = send = null; // clear the send and receive frame buffers

            // not shown: make queue of outgoing frames

            // see below for plug and play version of this function

}

 

////////// incoming events from the network interface /////////

                       

// In I2C, frames can be any length. So each packet is converted directly to a frame

// other datalink layers might have limited frame sizes so each packet might get split

// up into multiple frames.

 

i2C_recv_start_isr() { // start of a frame

            if (init) return;

            if (!master) {  // if I’m master then ignore (was sent by SELF)

            recv = null;        // otherwise, get ready to receive

                        recv_ix = 0;

                        wait_for_stop = false;

            } else {  // if I’m master then put address on bus

                        *PORT = send[send_ix++]; // start sending current frame

            }

}

 

i2C_send_start() {

            if (tryStart()) {                 // not guaranteed to succeed in i2C

                        master = true;   // if successful the become master and get a frame to send

                        send = get_frame(); // get a frame to send from the queue

                        send_ix = 0;      // initialize send counter

                        send_length = get_packet_length(send);

            }

}

 

 

i2C_recv_stop_isr() {

            if (init) return;

            if (wait_for_stop) {  // not our frame so ignore

wait_for_stop = false;

                        return;

            }

            if (!master)  { // just received a frame so send up to transport layer (frame == packet in i2C)

                        if (datalink_control_frame(recv)) process_control_frame(recv); // for plug-and-play

                        else packet = assemble_packet(recv, recv_ix);     // reassembles frames into packets, if packet is complete, return packet else null

if (packet) transport_recv(recv, recv_ix);

            }

            master = false;

            if (frames_to_send()) { // if outgoing frames are pending, then try to get bus

                        i2C_send_start();

            }

}

 

i2C_recv_byte_isr() {

            if (init) {ack = true; return;}

            if (wait_for_stop) return;  // ignore these events if message is not for me

            if (!master) {                   // must be slave so add to the incoming frame buffer

            if (recv_ix == 0)  { // first byte of incoming frame so process header                                   

            if (*PORT != SELF) {   // check if frame is for me

                                    wait_for_stop = true; // set to ignore until stop if not

                                    return;

            }

// otherwise, this frame is for me

// throw away first byte (datalink header)  (or use to authenticate packet!)

recv = new packet;  // start a new frame, which is a packet

}

else recv[recv_ix++] = *PORT;  // we’re in the middle of a frame so add new byte to buffer

            } else {  // we are master so send another byte from send frame buffer

                        if (send_ix == send_length) i2C_send_stop();  // frame is finished, so send a stop condition

                        else {    // we are in the middle of a send frame so continue

                                    *PORT = send[send_ix++];

                        }

}          

}

i2C_byte_error_isr() {

    if (init) { nack = true; return; }

    if (wait_for_stop) return;

    if (!master) { // the error was that this processor did not send an acknowledgement!

                      // but there doesn't seem to be anything we can do about it

                wait_for_stop = true;   // may as well ignore rest of message and hope for retransmit

                return;

    } else { // the receiver did not acknowledge the byte so we need to retransmit the frame

            if (get_xmit_count(send) >= MAXRETRY) datalink_error(frame);

            else datalink_frame_send(send) ; // just puts this frame back on the internal queue and increments 

                                                                // frame's retransmit counter

            i2C_send_stop() ; // abort this frame

    }

}

 

 

Question 1: How can we add plug and play to this?

    1. In init, get the bus and probe each address until no ack is received then claim the address (SELF = i)

 

boolean ack,nack;

i2C_init {

   init = true;

   for (i = 0; (i < MAX & !ack); i++) {

       while (! i2C_try_start());  // get control of the bus

        ack = nak = false;          // variables to be byte or error isr's

       *PORT = i;                    // send the byte

       while (!ack & !nack);    // wait for ack or nack

   }

    if (ack) {        

        SELF = i;

        datalink_frame_send(make_control_frame(0,NEWADDRESS, SELF)); // broadcast new addres

    }

    else datalink_error();

    init = false;

}

Question 2: How can we add watchdog to the system?

each processor periodically sends a ping message to SELF+1 modulo the highest address in the system.

should this be at the datalink, session, or transport level?

periodically send, if after a certain number of sends some flag is not cleared, then there is an error 

in the system.

           

 

Question 3: How to I do the pipes

    open(pipeID, READ)    // 

    open( pipeID, WRITE) // 

    read(pipeID);  //

    write(pipeID); //