///////////////// The Session Layer /////////////////
///////////////// Session Data
Structures /////////////////
//
queue ID’s for communication between session and data-link
#define
SELF 1 //
different for each processor
struct
shared_mem {
char
value;
int
policy;
int
id;
boolean
postOK;
int
publisher;
list *subscribers;
boolean update; // flag for use in copy-on-read
queue waiting; // for use with semaphore
}
shared_mem
globals[N];
session_init()
{
g = 0; // number of currently known shared globals
}
///// Outgoing Messages
//////////////////////
int
publish(int dataID, int policy) { //msg = <PUB, publisher, data ID,
policy>, broadcast
message[0]
= PUBLISH; //
Message Type
message[1]
= SELF; // ID of Publisher
message[2]
= dataID; // ‘Address’ of the shared variable
message[3]
= policy; // update policy …
transport_send(0,message,
4); // broadcast
this message (can this block?)
return(0);
}
int
subscribe(int dataID, int policy) { //msg = <SUB, subscriber, data
ID, policy>, to publisher
startTimer();
// block until published or timeout
while
( (!tmp=lookup(globals,dataID)) && readTimer() < TIMEOUT);
if
(tmp) { // publisher found
message[0]
= SUBSCRIBE; //
Message Type
message[1]
= SELF; // I am the subscriber
message[2] = dataID;
message[3] = policy; //
subscription update policy
transport_send(tmp.publisher,
message, 4) ; // send message to publisher
return(0)
}
send_signal(NETWORKTIMEOUT); // signal to all tasks on this processor for
error handling
}
int post(int dataID, char value) {
if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to post to unpublished data item
if (tmp.publisher == SELF) {
tmp.value = value;
if (tmp.policy == CR) return(0); // no need to do anything!
else { // copy on write policy, need to inform subscribers
foreach(p in tmp.subscribers) {
message[0] = POST;
message[1] = dataID;
message[2] = SELF; // so that receiver can verify!
message[3] = value; // send the data
transport_send(p, message, 4); // send post message to p
}
} // I am not publisher, I must be subscriber
// send a post message...but don't update tmp.value. This can only happen as a result of receiving a POST.
p = tmp.publisher
message[0] = POST;
message[1] = dataID;
message[2] = SELF;
message[3] = value;
transport_send(p, message, 4);
return 0;
}
int update(int dataID, char *value) {
if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to post to unpublished data item
if (SELF = tmp.publisher) *value = tmp.value;
if (SELF not in tmp.subscribers) return(-ENOTSUBSCRIBER);
if (tmp.policy = CW) {
*value = tmp.value;
return(0);
}
// its copy on read, so send a message!
message[0] = UPDATE;
message[1] = dataID;
message[2] = SELF; // so that receiver can verify
transport_send(tmp.publisher, message, 3); // send post message to p
tmp.update = false;
sleep_on(tmp.queue, TIMEOUT); // blocking
if (!tmp.update) return (-EREADTIMEOUT);
else *value = tmp.value;
}
/* if self is publisher, then check semaphore and continue or addQ and block */
/* if not, then send a WAIT message, addQ, and block; */
int wait(dataID) {
// only support CR (actually no copy at all)
if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to post to unpublished data item
if (tmp.publisher == SELF) {
if (tmp.value <= 0) {
sleep_on(tmp.queue);
}
else tmp.value--;
return 0;
} else
p = tmp.publisher;
message[0] = WAIT;
message[1] = dataID;
message[2] = SELF;
message[3] = value;
transport_send(p, message, 4);
sleep_on(tmp.queue);
}
return 0;
}
/* if SELF is publisher, then increment variable and get a waiting process off of the queue.
* if the waiting process is SELF then wake self, otherwise, send a SIGNAL message to the waiting process
* If SELF is not publisher, then send a signal message to the publisher and continue */
int signal(dataID) {
if (!tmp = lookup(dataID)) return (-ENOTPUBLISHED); // trying to signal to unpublished data item
if (tmp.publisher == SELF) {
tmp.value++;
task = tmp.deqQ();
if (task.getAddress() == SELF) {
wake(task);
} else {
p = tmp.publisher;
message[0] = SIGNAL;
message[1] = dataID;
message[2] = SELF;
transport_send(p, message, 3);
}
}
return 0;
}
///// Incoming Session Layer Messages
//////////////////////
int session_recv(char
*message, int len) { // called by data link layer when a message is received
if (len < 4) session_error();
switch (message[0])
SUBSCRIBE:
if
(!tmp =
lookup(message[2])) session_error(message); // not published
if
(tmp.publisher != SELF) session_error(message); // by me
ir
(tmp.policy != message[3]) session_error(message); // policy mismatch
add_subscriber(tmp.subcribers,
message[1]); // add to subscribers list
return(0);
PUBLISH:
if
(tmp =
lookup(message[2])) session_error(message); // already
published
tmp =
globals[g++]; // fill in data structure
tmp.policy
= message[3];
tmp.id
= message[2];
tmp.publisher
= message[1];
return(0);
POST:
if (!tmp = lookup(message[1]) session_error(message); // not published
if (SELF in tmp.subscribers) { // message from SUBSCRIBER
tmp.value = message[3];
if (tmp.policy == CR) { // must be blocked
wake_on(update_q); // wake up reader(s)
tmp.update = true;
} else
if (SELF == tmp.publisher) post(message[1], message[3]); //
else session_error(message); // not publisher or subscriber!
return(0);
UPDATE:
if (!tmp = lookup(message[1]) session_error(message); // not published
if (tmp.publisher != SELF) session_error(message); // by me
if (message[2] not in tmp.subscribers) session_error(message); // not sub
subscriber = message[2]; // save subscriber info
message = make_message(POST, tmp.dataID, SELF, tmp.value);
session_send(subscriber, message, 4); return(0);
WAIT:
// if SELF is not publisher then error. Never send a WAIT message to subscriber!
// otherwise, decrement and either send SIGNAL back or add subscriber to Q
// publisher is already blocked on the other side, waiting for a signal message!
if (!tmp = lookup(message[1]) session_error(message); // not published
if (tmp.publisher != SELF) session_error(message); // by me
if (tmp.value <= 0) addQ(tmp.queue, message[2]); // task address into wait queue but don't sleep!
else {
tmp.value--;
make_message(SIGNAL, tmp.dataID, SELF);
session_send(message[2], message, 3);
} return(0);
SIGNAL:
// if SELF, the publisher recvd signal. So increment count and deQ
// if deQ'd process is not SELF, send a signal command
// otherwise wake self
// Otherwise, the publisher has sent a message to the subscriber
// subscriber should do a local deQ and wake.
if (!tmp = lookup(message[1]) session_error(message); // not published
if (tmp.publisher == SELF) {
tmp.value++;
task = tmp.deQ(); // returns a process info struct or just a machine ID
if (task.getAddress() == SELF) wake(task);
// I'm cheating a little, task is either a task struct (local) or a machine id (remote)
else {
make_message(SIGNAL, tmp.dataID, SELF);
session_send(task, message, 3);
}
}
else {
task = tmp.deQ(); // wake any local process waiting on this queue
wake(task);
} return 0;
default: session_error(message); // have to deal with errors
}
////////////////// Data-link
layer //////////////////////////
// keep a
packet counter so that we can uniquely tag each packet for a period of time
transport_packet_counter =
0;
char *transport_packet; // packet
= <dst, src, number, checksum, length, session_msg>
// or it
can be <dst,src,number,0,0> = NACK
/////// Outgoing Transport Packets
//////////
transport_send(int
dst, char *session_msg, int length) { // called from session layer with a session
message
packet =
new transport_packet(session_msg, dst, length);
if
(dst ==
SELF || dst == 0) transport_recv(packet); // loopback in
the event of SELF or Broadcast
if
(dst
!= SELF) { // if not self,
send to datalink layer
datalink_send(packet);
save_until
_timeout(packet);
}
}
//////// Incoming Transport Packets
////////
transport_recv(char *packet, int length) { // called from datalink
layer when a packet is received.
if
(isNACK(packet)) { // receiver had a problem so retransmit if still saved
resend
= get_saved _packet(get_packet_number(packet)); // lookup saved packet for resend
if
(resend) datalink_send(resend);
else
transport_error(packet); // can no longer retransmit, so transport error has occurred
}
else { // it
must be normal packet
char
*session_msg = transport_verify(packet, length); //
validates header and checksum, extracts session message
If
(session_msg) //
if null, then assume that verify failed so send a NACK
session_recv(session_msg); // pass extracted
message to session layer
else
datalink_send(make_NACK_packet(packet));
// send a NACK to the src of the packet.
// a NACK to a
NACK should cause error, so don’t save
}
}
////////////////////////// Datalink Layer ////////////////////////
/////////// what transport needs to know
about datalink ////////
datalink_send(char *packet) {
//
here is the new version: a packet is converted into a set of frames appropriate
for the datalink interface
//
The frame format contains information needed to reconstruct the packet along with
the dst address needed by i2C
// <packet> becomes
<dst, seq#, packet#, [part of packet]> <dst, seq#, packet#, [part of packet]> ... <dst, seq#,
packet#, [part of packet]>
// it should be something like this
make_frames(packet);
// chops packet up into frames and puts them on and
internal queue.
I2C_send_start(); // try to
send
}
datalink_init() {
master
= false; // we
are not currently bus master
recv = send = null; // clear the send and receive frame buffers
// not shown: make queue of outgoing frames
// see below for plug and play version of this function
}
////////// incoming events from the
network interface /////////
// In I2C, frames can be any length. So
each packet is converted directly to a frame
// other datalink layers might have
limited frame sizes so each packet might get split
// up into multiple frames.
i2C_recv_start_isr() { // start of a frame
if (init) return;
if
(!master) { //
if I’m master then ignore (was sent by SELF)
recv = null; // otherwise, get ready to receive
recv_ix
= 0;
wait_for_stop
= false;
}
else { // if
I’m master then put address on bus
*PORT
= send[send_ix++]; // start sending current frame
}
}
i2C_send_start() {
if
(tryStart()) { // not guaranteed to succeed in i2C
master
= true; // if
successful the become master and get a frame to send
send
= get_frame(); // get a frame to send from the queue
send_ix
= 0; //
initialize send counter
send_length
= get_packet_length(send);
}
}
i2C_recv_stop_isr() {
if (init) return;
if
(wait_for_stop) { // not our frame so ignore
wait_for_stop
= false;
return;
}
if (!master) { // just received a frame so send up to transport layer (frame == packet in i2C)
if (datalink_control_frame(recv))
process_control_frame(recv); // for
plug-and-play
else
packet = assemble_packet(recv,
recv_ix); // reassembles frames into packets,
if packet is complete, return packet else null
if
(packet) transport_recv(recv,
recv_ix);
}
master
= false;
if
(frames_to_send()) { // if outgoing frames are
pending, then try to get bus
i2C_send_start();
}
}
i2C_recv_byte_isr() {
if (init) {ack = true; return;}
if
(wait_for_stop) return; // ignore these events if message is not for me
if
(!master) { // must be slave so add to the incoming frame buffer
if (recv_ix == 0) { // first byte of
incoming frame so process header
if (*PORT != SELF) { // check if frame
is for me
wait_for_stop
= true; // set to ignore until stop if not
return;
}
//
otherwise, this frame is for me
//
throw away first byte (datalink header)
(or use to authenticate packet!)
recv =
new packet; //
start a new frame, which is a packet
}
else
recv[recv_ix++] = *PORT; // we’re in the middle of a frame so add new byte to buffer
}
else { // we are
master so send another byte from send frame buffer
if
(send_ix == send_length) i2C_send_stop();
// frame is finished, so send a stop condition
else
{ // we are in
the middle of a send frame so continue
*PORT
= send[send_ix++];
}
}
}
i2C_byte_error_isr() {
if (init) { nack = true; return; }
if (wait_for_stop) return;
if (!master) { // the error was that this processor did not send an acknowledgement!
// but there doesn't seem to be anything we can do about it
wait_for_stop = true; // may as well ignore rest of message and hope for retransmit
return;
} else { // the receiver did not acknowledge the byte so we need to retransmit the frame
if (get_xmit_count(send) >= MAXRETRY) datalink_error(frame);
else datalink_frame_send(send) ; // just puts this frame back on the internal queue and increments
// frame's retransmit counter
i2C_send_stop() ; // abort this frame
}
Question 1: How can we add plug and play to this?
1. In init, get the bus and probe each address until no ack is received then claim the address (SELF = i)
boolean ack,nack;
i2C_init {
init = true;
for (i = 0; (i < MAX & !ack); i++) {
while (! i2C_try_start()); // get control of the bus
ack = nak = false; // variables to be byte or error isr's
*PORT = i; // send the byte
while (!ack & !nack); // wait for ack or nack
}
if (ack) {
SELF = i;
datalink_frame_send(make_control_frame(0,NEWADDRESS, SELF)); // broadcast new addres
}
else datalink_error();
init = false;
}
Question 2: How can we add watchdog to the system?
should this be at the datalink, session, or transport level?
periodically send, if after a certain number of sends some flag is not cleared, then there is an error
in the system.
Question 3: How to I do the pipes
open(pipeID, READ) //
open( pipeID, WRITE) //
read(pipeID); //
write(pipeID); //