///////////////// The Session Layer                                                                      /////////////////

///////////////// Session Data Structures   /////////////////

            // queue ID’s for communication between session and data-link

            #define SELF 1     // different for each processor

 

            struct shared_mem {

                        char value;

                        int policy;

                        int id;

                        boolean postOK;

                        int publisher;

                        list *subscribers;

                        boolean update;   // flag for use in copy-on-read

}

shared_mem globals[N];

 

session_init() {

            g = 0;  // number of currently known shared globals

            }

///////////////// Shows how one Session Layer talks to another Session Layer //////////

///// Outgoing Messages //////////////////////           

            int publish(int dataID, int policy) { //msg = <PUB, publisher, data ID, policy>, broadcast

                        message[0] = PUBLISH;   // Message Type

                        message[1] = SELF;        // ID of Publisher

                        message[2] = dataID;       // ‘Address’ of the shared variable

                        message[3] = policy;        // update policy …

                        transport_send(0,message, 4);   // broadcast this message (can this block?)

                        return(0);

            }                      

           

            int subscribe(int dataID, int policy) { //msg = <SUB, subscriber, data ID, policy>, to publisher

                        startTimer();

                        // block until published or timeout

                        while ( (!tmp=lookup(globals,dataID)) && readTimer() < TIMEOUT);

                        if (tmp) { // publisher found

                        message[0] = SUBSCRIBE;    // Message Type

     message[1] = SELF;              //  I am the subscriber

                        message[2] = dataID;

                        message[3] = policy;              // subscription update policy

                        transport_send(tmp.publisher, message, 4) ; // send message to publisher

                        return(0)

            }

            send_signal(NETWORKTIMEOUT);  // signal to all tasks on this processor for error handling

}

 

int post(int dataID, char value) {

    if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to post to unpublished data item

    if (tmp.publisher == SELF) {

            tmp.value = value;     

            if (tmp.policy == CR) return(0);  // no need to do anything!

            else  {    // copy on write policy, need to inform subscribers

                foreach(p in tmp.subscribers) {

                    message[0] = POST;

                    message[1] = dataID;

                    message[2] = SELF;   // so that receiver can verify!

                    message[3] = value;  // send the data

                    transport_send(p, message, 4); // send post message to p

            }

     } // I am not publisher, I must be subscriber

      // send a post message...but don't update tmp.value. This can only happen as a result of receiving a POST.

     p = tmp.publisher

     message[0] = POST;

     message[1] = dataID;

     message[2] = SELF;

     message[3] = value;

     transport_send(p, message, 4);

     return 0;

}  

int update(int dataID, char *value) {

    if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to post to unpublished data item

    if (SELF not in tmp.subscribers) return(-ENOTSUBSCRIBER); 

    if (tmp.policy = CW) { 

        *value = tmp.value;

        return(0);

    }

    // its copy on read, so send a message!

   message[0] = UPDATE;

   message[1] = dataID;

   message[2] = SELF;   // so that receiver can verify

   transport_send(tmp.publisher, message, 3); // send post message to p

   tmp.update = false;

   sleep_on(update_q, TIMEOUT);                    // blocking

   if (!tmp.update) return (-EREADTIMEOUT);

   else *value = tmp.value;

}

 

 

///// Incoming Session Layer Messages //////////////////////

int session_recv(char *message, int len) {  // called by data link layer when a message is received

            if (len < 4) session_error();

            switch (message[0])

                        SUBSCRIBE:

                                    if (!tmp = lookup(message[2]))  session_error(message); // not published

                                    if (tmp.publisher != SELF) session_error(message);       // by me

                                    ir (tmp.policy != message[3]) session_error(message);  // policy mismatch

                                    add_subscriber(tmp.subcribers, message[1]);       // add to subscribers list

                                    return(0);                                  

                        PUBLISH:

                                    if (tmp = lookup(message[2])) session_error(message); // already published

                                    tmp = globals[g++];                                // fill in data structure

                                                tmp.policy = message[3];

                                                tmp.id = message[2];    

                                                tmp.publisher = message[1];

                                                return(0);

                        POST:  

                                    if (!tmp = lookup(message[1]) session_error(message); // not published

                                    if (SELF in tmp.subscribers) { // message from SUBSCRIBER

                                         tmp.value = message[3];

                                        if (tmp.policy == CR) { // must be blocked

                                            wake_on(update_q); // wake up reader(s)

                                            tmp.update = true;

                                    } else 

                                     if (SELF == tmp.publisher) post(message[1], message[3]); //

                                     else session_error(message); // not publisher or subscriber!

                                     return(0);

                        UPDATE:

                                    if (!tmp = lookup(message[1]) session_error(message); // not published

                                    if (tmp.publisher != SELF) session_error(message); // by me

                                    if (message[2] not in tmp.subscribers) session_error(message); // not sub

                                    subscriber = message[2]; // save subscriber info

                                    message = make_message(POST, tmp.dataID, SELF, tmp.value);

                                    session_send(subscriber, message, 4);  return(0);                                                        

                        default:  session_error(message); // have to deal with errors

}

 

 

 

 

////////////////// Data-link layer //////////////////////////

                                              

// keep a packet counter so that we can uniquely tag each packet for a period of time

transport_packet_counter = 0;

char *transport_packet;  // packet = <dst, src, number, checksum, length, session_msg>

          // or it can be <dst,src,number,0,0> = NACK

 

 

/////// Outgoing Transport Packets //////////

transport_send(int dst, char *session_msg, int length) {  // called from session layer with a  session message

packet = new transport_packet(session_msg, dst, length);

if (dst == SELF || dst == 0) transport_recv(packet);        // loopback in the event of SELF or Broadcast

if (dst != SELF) {                                                  // if not self, send to datalink layer

datalink_send(packet);

                        save_until _timeout(packet);

            }          

}

 

//////// Incoming Transport Packets ////////

transport_recv(char *packet, int length) { // called from datalink layer when a packet is received.

            if (isNACK(packet)) {  // receiver had a problem so retransmit if still saved

                        resend = get_saved _packet(get_packet_number(packet));  // lookup saved packet for resend

                        if (resend) datalink_send(resend);          

                        else transport_error(packet);                     // can no longer retransmit, so transport error has occurred

            } else {  // it must be normal packet

                        char *session_msg = transport_verify(packet, length); // validates header and checksum, extracts session message

                        If (session_msg)  // if null, then assume that verify failed so send a NACK

                        session_recv(session_msg);  // pass extracted message to session layer

else

                        datalink_send(make_NACK_packet(packet)); // send a NACK to the src of the packet.

                                                                                           // a NACK to a NACK should cause error, so don’t save

}

}

 

 

////////////////////////// Datalink Layer ////////////////////////

/////////// what transport needs to know about datalink ////////

datalink_send(char *packet) {

 

// here is the new version: a packet is converted into a set of frames appropriate for the datalink interface

// The frame format contains information needed to reconstruct the packet along with the dst address needed by i2C

 

            //  <packet> becomes   <dst, seq#, packet#, [part of packet]>  <dst, seq#, packet#, [part of packet]> ... <dst, seq#, packet#, [part of packet]> 

            // it should be something like this

            make_frames(packet); // chops packet up into frames and puts them on and internal queue.

            I2C_send_start();         // try to send

}

 

datalink_init() {

            master = false;  // we are not currently bus master

            recv = send = null; // clear the send and receive frame buffers

            // not shown: make queue of outgoing frames

            // see below for plug and play version of this function

}

 

////////// incoming events from the network interface /////////

                       

// In I2C, frames can be any length. So each packet is converted directly to a frame

// other datalink layers might have limited frame sizes so each packet might get split

// up into multiple frames.

 

i2C_recv_start_isr() { // start of a frame

            if (init) return;

            if (!master) {  // if I’m master then ignore (was sent by SELF)

            recv = null;        // otherwise, get ready to receive

                        recv_ix = 0;

                        wait_for_stop = false;

            } else {  // if I’m master then put address on bus

                        *PORT = send[send_ix++]; // start sending current frame

            }

}

 

i2C_send_start() {

            if (tryStart()) {                 // not guaranteed to succeed in i2C

                        master = true;   // if successful the become master and get a frame to send

                        send = get_frame(); // get a frame to send from the queue

                        send_ix = 0;      // initialize send counter

                        send_length = get_packet_length(send);

            }

}

 

 

i2C_recv_stop_isr() {

            if (init) return;

            if (wait_for_stop) {  // not our frame so ignore

wait_for_stop = false;

                        return;

            }

            if (!master)  { // just received a frame so send up to transport layer (frame == packet in i2C)

                        if (datalink_control_frame(recv)) process_control_frame(recv); // for plug-and-play

                        else packet = assemble_packet(recv, recv_ix);     // reassembles frames into packets, if packet is complete, return packet else null

if (packet) transport_recv(recv, recv_ix);

            }

            master = false;

            if (frames_to_send()) { // if outgoing frames are pending, then try to get bus

                        i2C_send_start();

            }

}

 

i2C_recv_byte_isr() {

            if (init) {ack = true; return;}

            if (wait_for_stop) return;  // ignore these events if message is not for me

            if (!master) {                   // must be slave so add to the incoming frame buffer

            if (recv_ix == 0)  { // first byte of incoming frame so process header                                   

            if (*PORT != SELF) {   // check if frame is for me

                                    wait_for_stop = true; // set to ignore until stop if not

                                    return;

            }

// otherwise, this frame is for me

// throw away first byte (datalink header)  (or use to authenticate packet!)

recv = new packet;  // start a new frame, which is a packet

}

else recv[recv_ix++] = *PORT;  // we’re in the middle of a frame so add new byte to buffer

            } else {  // we are master so send another byte from send frame buffer

                        if (send_ix == send_length) i2C_send_stop();  // frame is finished, so send a stop condition

                        else {    // we are in the middle of a send frame so continue

                                    *PORT = send[send_ix++];

                        }

}          

}

i2C_byte_error_isr() {

    if (init) { nack = true; return; }

    if (wait_for_stop) return;

    if (!master) { // the error was that this processor did not send an acknowledgement!

                      // but there doesn't seem to be anything we can do about it

                wait_for_stop = true;   // may as well ignore rest of message and hope for retransmit

                return;

    } else { // the receiver did not acknowledge the byte so we need to retransmit the frame

            if (get_xmit_count(send) >= MAXRETRY) datalink_error(frame);

            else datalink_frame_send(send) ; // just puts this frame back on the internal queue and increments 

                                                                // frame's retransmit counter

            i2C_send_stop() ; // abort this frame

    }

}

 

 

Question 1: How can we add plug and play to this?

    1. In init, get the bus and probe each address until no ack is received then claim the address (SELF = i)

 

boolean ack,nack;

i2C_init {

   init = true;

   for (i = 0; (i < MAX & !ack); i++) {

       while (! i2C_try_start());  // get control of the bus

        ack = nak = false;          // variables to be byte or error isr's

       *PORT = i;                    // send the byte

       while (!ack & !nack);    // wait for ack or nack

   }

    if (ack) {        

        SELF = i;

        datalink_frame_send(make_control_frame(0,NEWADDRESS, SELF)); // broadcast new addres

    }

    else datalink_error();

    init = false;

}

Question 2: How can we add watchdog to the system?

each processor periodically sends a ping message to SELF+1 modulo the highest address in the system.

should this be at the datalink, session, or transport level?

periodically send, if after a certain number of sends some flag is not cleared, then there is an error 

in the system.

           

 

Question 3: How to I do the pipes

    open(pipeID, READ)    // 

    open( pipeID, WRITE) // 

    read(pipeID);  //

    write(pipeID); //