2014年12月10日 星期三

Setting up shared memory with robust pthread_mutex and pthread_cond


I've tried to experiment with shared memory.
Eventually, I figured out that you can use  pthread_mutex and pthread_contion to synchronize the access of a shared memory.

During the process, I found out that it is very important to make the pthread_mutex "robust".

This so called robust mutex is important because this mutex can be shared by multiple processes, and therefore if one process crashes without releasing the mutex, all the other process could be stuck forever.

A robust mutex will release it's possession to the next process, if the current mutex possessing process crashes.
The pthread_mutex_lock() of the new mutex possessing process, will return with "EOWNERDEAD".

On this event, the new process should be aware that the share memory might have experienced a exception; therefore, it should be checked.

Once the shared memory and mutex is recovered, you should use "pthread_mutex_consistent()" to return the mutex to a "consistent" state.



the soure code needs to be built with "-lrt"  and "-lpthread".

source code:


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/types.h> //shm_open
#include <stdio.h>  //printf
#include <stdlib.h> //exit
#include <unistd.h> //close
#include <string.h> //strerror
#include <pthread.h>//pthread_cond* pthread_mutex*

/* This will be created under /dev/shm/ */
#define STATE_FILE "/program.shared" 

/* Define a struct we wish to share. Notice that we will allocate 
 *  * only sizeof SHARED_VAR, so all sizes are constant              
 *   */
typedef struct
{
 int flags;
 pthread_mutex_t mutex;
 pthread_cond_t cond;
}  SHARED_VAR;

int shared_mutex_init(SHARED_VAR *conf)
{
 pthread_mutexattr_t mutexAttr;
 pthread_mutexattr_init(&mutexAttr);

 do{

  if(pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED) != 0 ){
   printf("failed to set PTHREAD_PROCESS_SHARED\n");
   break;
  }

  if(pthread_mutexattr_setrobust(&mutexAttr, PTHREAD_MUTEX_ROBUST) != 0){
   printf("failed to set PTHREAD_MUTEX_ROBUST\n");
   break;
  }

  if(pthread_mutex_init(&(conf->mutex), &mutexAttr) != 0){
   printf("failed to init mutex\n");
   break;
  }

  printf("%s success\n", __FUNCTION__);

  return 0;

 }while(0);

 pthread_mutexattr_destroy(&mutexAttr);

 return -1;
}


int shared_cond_init(SHARED_VAR *conf)
{
 pthread_condattr_t condAttr;
 pthread_condattr_init(&condAttr);

 do{

  if(pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED) != 0 ){
   printf("failed to set PTHREAD_PROCESS_SHARED\n");
   break;
  }


  if(pthread_cond_init(&(conf->cond), &condAttr) != 0){
   printf("failed to init mutex\n");
   break;
  }

  printf("%s success\n", __FUNCTION__);

  return 0;

 }while(0);

 pthread_condattr_destroy(&condAttr);

 return -1;
}

int main (void)
{
 int first = 0;
 int i;
 int shm_fd;
 static SHARED_VAR *conf;
 int process_id;
 struct timespec abstime;

 /* Try to open the shm instance with  O_EXCL,
  *    * this tests if the shm is already opened by someone else 
  *       */
 if((shm_fd = shm_open(STATE_FILE, (O_CREAT | O_EXCL | O_RDWR), 
     (S_IREAD | S_IWRITE))) > 0 ) {
  first = 1; /* We are the first instance */
 }
 else if((shm_fd = shm_open(STATE_FILE, (O_CREAT | O_RDWR), 
     (S_IREAD | S_IWRITE))) < 0) {
  /* Try to open the shm instance normally and share it with 
   *     * existing clients 
   *         */
  printf("Could not create shm object. %s\n", strerror(errno));
  return errno;
 } 

 /* Set the size of the SHM to be the size of the struct. */
 ftruncate(shm_fd, sizeof(SHARED_VAR));

 /* Connect the conf pointer to set to the shared memory area,
  *    * with desired permissions 
  *       */
 if((conf =  mmap(0, sizeof(SHARED_VAR), (PROT_READ | PROT_WRITE), 
     MAP_SHARED, shm_fd, 0)) == MAP_FAILED) {

  return errno;

 }
 if(first) {
  /* Run a set up for the first time, fill some args */
  printf("First creation of the shm. Setting up default values\n");
  conf->flags = 0;
 }
 else
 {
  printf("Value of flags = %d\n", conf->flags);
 }
 
 if(shared_mutex_init(conf) != 0){
  printf("failed to create shared_mutex\n");
  return 0;
 }

 if(shared_cond_init(conf) != 0){
  printf("failed to create shared_cond\n");
  return 0;
 }

 
 int lockReturn;
 int consistent;
 int condReturn;

 do{

  lockReturn  = pthread_mutex_lock(&(conf->mutex));

  if(lockReturn ==  EOWNERDEAD){   

   /*Previous process died before releasing the mutex do some cleanup */

   consistent = pthread_mutex_consistent(&(conf->mutex));/*address that the cleanup is done*/
  }else if(lockReturn ! = 0){

   /*didn't get lock*/
   continue;
  }

  if(/*need to wait*/){
   clock_gettime(CLOCK_REALTIME, &abstime);
   abstime.tv_sec += 5;
   condReturn = pthread_cond_timedwait(&(conf->cond), &(conf->mutex), &abstime);
  }

  if(/*need to signal*/){
   pthread_cond_signal(&(conf->cond));
  }

  /*do something*/

  pthread_mutex_unlock(&(conf->mutex));

 }while( /*running()*/);

 close(shm_fd);
 exit(0);
}

2014年12月9日 星期二

How you receive an ethernet packet: from the ethernet driver to the BSD socket.


I've worked with lots of Ethernet driver developers and network applicaton engineers in the past few years.
It surprises me how one person might have dedicated years of programming into writing Ethernet MAC drivers and BSD sockets, but never bothered to know how the data is passed from the hw driver through the IP stack to the BSD socket API.

I wanted to share this because I think some times it is better to know the whole picture.

p.s.
I've referenced the kernel source code, which I was currently working on; therefore, the function & variable names might differ in various kernels, however the idea is the same.

OVERVIEW:

[HW driver] --> lookup packet type(ipv4? ipv6? ...etc) --> lookup destination (do route, drop, or input ...) [if(input)]--> lookup protocol(tcp, udp, icmp ... etc) -->lookup socket --> copy to user_space

PATH:

net device driver calls "netif_receive_sk()"

netif_receive_sk(){

    //1. check and return struct packet_type *
    //2. exec packet_type->function
    //
    // example:
    //    returned : struct packet_type ipv6_packet_type{
    //            .func = ipv6_rcv(struct sk_buff)
    //            }
    //
    //    struct sk_buff{
    //            struct dst_entry ._skb_dst = ipv6_input()
    //    }
   
    packet_type->func();
}

                                                                                                                                                   
ipv6_rcv()
{
    //1. look up the destination(returns a "struct dst_entry *")
    //2. calls ip6_rcv_finish()
    NF_INET_PRE_ROUTING(ip6_rcv_finish())
}

ip6_rcv_finish(){
    // 1. exec dst_entry->input() by calling ip6_route_input()
    //
    //example:
    //dstinsation is a ipv6 destinaition
    //
    //struct dst_entry
    //{
    //    .intput = ip6_input
    //}
    //
    ip6_route_input() //ip6_route_input() looks up all possible routes if(input){est_entry->input()}
   
}



ip6_input()
{
    //calls ip6_input_finish()
    NF_HOOK(ip6_input_finish());
}

ip6_input_finish()
{
    //1. lookup protocol by checking with the ip header(TCP or ICMP or UDP...etc)
    //2. returns a "struct inet_protocol"
    //3. exec inet_protocol.handler()
    //
    //example:
    //returns struct inet6_protocol tcpv6_protocol{
    //    .handler = tcp_v6_rcv()
    //    }
    //
   
    protocol.handler()  // according to example, calls tcp_v6_rcv()
}

tcp_v6_rcv()
{
    __inet6_lookup_skb() // lookup, match or create socket
        tcp_v6_do_rcv();
}

tcp_v6_do_rcv()
{
    tcp_rcv_established() -->skb_copy_datagram_iovec()-->memcpy_toiovec()-->copy_to_user()
}








others:

addrconf_dst_alloc() //create dst_entry