Helixis 1.0
Task Programming API
sources/liblfds/queue/queue_queue.c
Go to the documentation of this file.
00001 #include "queue_internal.h"
00002 #if defined(HLX_BUILD_WITH_PARALLEL_THREADING)
00003 extern hlx_api gl_api;
00004 
00005 
00006 
00007 /****************************************************************************/
00008 int queue_enqueue( struct queue_state *qs, void *user_data )
00009 {
00010   ALIGN(ALIGN_DOUBLE_POINTER) struct queue_element
00011     *qe[QUEUE_PAC_SIZE];
00012 
00013   /* TRD : user_data can be 0 */
00014 
00015   queue_internal_new_element_from_freelist( qs, qe, user_data );
00016 
00017   if( qe[QUEUE_POINTER] == 0 )
00018     return( 0 );
00019 
00020   queue_internal_queue( qs, qe );
00021 
00022   return( 1 );
00023 }
00024 
00025 
00026 
00027 
00028 
00029 /****************************************************************************/
00030 int queue_guaranteed_enqueue( struct queue_state *qs, void *user_data )
00031 {
00032   ALIGN(ALIGN_DOUBLE_POINTER) struct queue_element
00033     *qe[QUEUE_PAC_SIZE];
00034 
00035   /* TRD : user_data can be 0 */
00036 
00037   queue_internal_guaranteed_new_element_from_freelist( qs, qe, user_data );
00038 
00039   if( qe[QUEUE_POINTER] == 0 )
00040     return( 0 );
00041 
00042   queue_internal_queue( qs, qe );
00043 
00044   return( 1 );
00045 }
00046 
00047 
00048 
00049 
00050 
00051 /****************************************************************************/
00052 void queue_internal_queue( struct queue_state *qs, struct queue_element *qe[QUEUE_PAC_SIZE] )
00053 {
00054   ALIGN(ALIGN_DOUBLE_POINTER) struct queue_element
00055     *enqueue[QUEUE_PAC_SIZE],
00056     *next[QUEUE_PAC_SIZE];
00057 
00058   unsigned char
00059     cas_result = 0;
00060 
00061 
00062   do
00063   {
00064     enqueue[QUEUE_POINTER] = qs->enqueue[QUEUE_POINTER];
00065     enqueue[QUEUE_COUNTER] = qs->enqueue[QUEUE_COUNTER];
00066 
00067     next[QUEUE_POINTER] = enqueue[QUEUE_POINTER]->next[QUEUE_POINTER];
00068     next[QUEUE_COUNTER] = enqueue[QUEUE_POINTER]->next[QUEUE_COUNTER];
00069 
00070     /* TRD : this if() ensures that the next we read, just above,
00071              really is from qs->enqueue (which we copied into enqueue)
00072     */
00073 
00074     if( qs->enqueue[QUEUE_POINTER] == enqueue[QUEUE_POINTER] and qs->enqueue[QUEUE_COUNTER] == enqueue[QUEUE_COUNTER] )
00075     {
00076       if( next[QUEUE_POINTER] == 0 )
00077       {
00078         qe[QUEUE_COUNTER] = next[QUEUE_COUNTER] + 1;
00079         cas_result = gl_api.atomic_dcas_entry( (volatile atom_t *) enqueue[QUEUE_POINTER]->next, (atom_t *) qe, (atom_t *) next );
00080       }
00081       else
00082       {
00083         next[QUEUE_COUNTER] = enqueue[QUEUE_COUNTER] + 1;
00084         gl_api.atomic_dcas_entry( (volatile atom_t *) qs->enqueue, (atom_t *) next, (atom_t *) enqueue );
00085       }
00086     }
00087   }
00088   while( cas_result == 0 );
00089 
00090   qe[QUEUE_COUNTER] = enqueue[QUEUE_COUNTER] + 1;
00091   gl_api.atomic_dcas_entry( (volatile atom_t *) qs->enqueue, (atom_t *) qe, (atom_t *) enqueue );
00092 
00093   return;
00094 }
00095 
00096 
00097 
00098 
00099 
00100 /****************************************************************************/
00101 int queue_dequeue( struct queue_state *qs, void **user_data )
00102 {
00103   ALIGN(ALIGN_DOUBLE_POINTER) struct queue_element
00104     *enqueue[QUEUE_PAC_SIZE],
00105     *dequeue[QUEUE_PAC_SIZE],
00106     *next[QUEUE_PAC_SIZE];
00107 
00108   unsigned char
00109     cas_result = 0;
00110 
00111   int
00112     rv = 1,
00113     state = QUEUE_STATE_UNKNOWN,
00114     finished_flag = LOWERED;
00115 
00116 
00117   do
00118   {
00119     dequeue[QUEUE_POINTER] = qs->dequeue[QUEUE_POINTER];
00120     dequeue[QUEUE_COUNTER] = qs->dequeue[QUEUE_COUNTER];
00121 
00122     enqueue[QUEUE_POINTER] = qs->enqueue[QUEUE_POINTER];
00123     enqueue[QUEUE_COUNTER] = qs->enqueue[QUEUE_COUNTER];
00124 
00125     next[QUEUE_POINTER] = dequeue[QUEUE_POINTER]->next[QUEUE_POINTER];
00126     next[QUEUE_COUNTER] = dequeue[QUEUE_POINTER]->next[QUEUE_COUNTER];
00127 
00128     /* TRD : confirm that dequeue didn't move between reading it
00129              and reading its next pointer
00130     */
00131 
00132     if( dequeue[QUEUE_POINTER] == qs->dequeue[QUEUE_POINTER] and dequeue[QUEUE_COUNTER] == qs->dequeue[QUEUE_COUNTER] )
00133     {
00134       if( enqueue[QUEUE_POINTER] == dequeue[QUEUE_POINTER] and next[QUEUE_POINTER] == 0 )
00135         state = QUEUE_STATE_EMPTY;
00136 
00137       if( enqueue[QUEUE_POINTER] == dequeue[QUEUE_POINTER] and next[QUEUE_POINTER] != 0 )
00138         state = QUEUE_STATE_ENQUEUE_OUT_OF_PLACE;
00139 
00140       if( enqueue[QUEUE_POINTER] != dequeue[QUEUE_POINTER] )
00141         state = QUEUE_STATE_ATTEMPT_DEQUEUE;
00142 
00143       switch( state )
00144       {
00145         case QUEUE_STATE_EMPTY:
00146           *user_data = 0;
00147           rv = 0;
00148           finished_flag = RAISED;
00149         break;
00150 
00151         case QUEUE_STATE_ENQUEUE_OUT_OF_PLACE:
00152           next[QUEUE_COUNTER] = enqueue[QUEUE_COUNTER] + 1;
00153           gl_api.atomic_dcas_entry( (volatile atom_t *) qs->enqueue, (atom_t *) next, (atom_t *) enqueue );
00154         break;
00155 
00156         case QUEUE_STATE_ATTEMPT_DEQUEUE:
00157           *user_data = next[QUEUE_POINTER]->user_data;
00158 
00159           next[QUEUE_COUNTER] = dequeue[QUEUE_COUNTER] + 1;
00160           cas_result = gl_api.atomic_dcas_entry( (volatile atom_t *) qs->dequeue, (atom_t *) next, (atom_t *) dequeue );
00161 
00162           if( cas_result == 1 )
00163             finished_flag = RAISED;
00164         break;
00165       }
00166     }
00167   }
00168   while( finished_flag == LOWERED );
00169 
00170   if( cas_result == 1 )
00171     freelist_push( qs->fs, dequeue[QUEUE_POINTER]->fe );
00172 
00173   return( rv );
00174 }
00175 
00176 #endif /* !HLX_BUILD_WITH_PARALLEL_THREADING */
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Defines