![]() |
Helixis 1.0
Task Programming API
|
00001 #include "queue_internal.h" 00002 #if defined(HLX_BUILD_WITH_PARALLEL_THREADING) 00003 extern hlx_api gl_api; 00004 00005 00006 00007 /****************************************************************************/ 00008 int queue_enqueue( struct queue_state *qs, void *user_data ) 00009 { 00010 ALIGN(ALIGN_DOUBLE_POINTER) struct queue_element 00011 *qe[QUEUE_PAC_SIZE]; 00012 00013 /* TRD : user_data can be 0 */ 00014 00015 queue_internal_new_element_from_freelist( qs, qe, user_data ); 00016 00017 if( qe[QUEUE_POINTER] == 0 ) 00018 return( 0 ); 00019 00020 queue_internal_queue( qs, qe ); 00021 00022 return( 1 ); 00023 } 00024 00025 00026 00027 00028 00029 /****************************************************************************/ 00030 int queue_guaranteed_enqueue( struct queue_state *qs, void *user_data ) 00031 { 00032 ALIGN(ALIGN_DOUBLE_POINTER) struct queue_element 00033 *qe[QUEUE_PAC_SIZE]; 00034 00035 /* TRD : user_data can be 0 */ 00036 00037 queue_internal_guaranteed_new_element_from_freelist( qs, qe, user_data ); 00038 00039 if( qe[QUEUE_POINTER] == 0 ) 00040 return( 0 ); 00041 00042 queue_internal_queue( qs, qe ); 00043 00044 return( 1 ); 00045 } 00046 00047 00048 00049 00050 00051 /****************************************************************************/ 00052 void queue_internal_queue( struct queue_state *qs, struct queue_element *qe[QUEUE_PAC_SIZE] ) 00053 { 00054 ALIGN(ALIGN_DOUBLE_POINTER) struct queue_element 00055 *enqueue[QUEUE_PAC_SIZE], 00056 *next[QUEUE_PAC_SIZE]; 00057 00058 unsigned char 00059 cas_result = 0; 00060 00061 00062 do 00063 { 00064 enqueue[QUEUE_POINTER] = qs->enqueue[QUEUE_POINTER]; 00065 enqueue[QUEUE_COUNTER] = qs->enqueue[QUEUE_COUNTER]; 00066 00067 next[QUEUE_POINTER] = enqueue[QUEUE_POINTER]->next[QUEUE_POINTER]; 00068 next[QUEUE_COUNTER] = enqueue[QUEUE_POINTER]->next[QUEUE_COUNTER]; 00069 00070 /* TRD : this if() ensures that the next we read, just above, 00071 really is from qs->enqueue (which we copied into enqueue) 00072 */ 00073 00074 if( qs->enqueue[QUEUE_POINTER] == enqueue[QUEUE_POINTER] and qs->enqueue[QUEUE_COUNTER] == enqueue[QUEUE_COUNTER] ) 00075 { 00076 if( next[QUEUE_POINTER] == 0 ) 00077 { 00078 qe[QUEUE_COUNTER] = next[QUEUE_COUNTER] + 1; 00079 cas_result = gl_api.atomic_dcas_entry( (volatile atom_t *) enqueue[QUEUE_POINTER]->next, (atom_t *) qe, (atom_t *) next ); 00080 } 00081 else 00082 { 00083 next[QUEUE_COUNTER] = enqueue[QUEUE_COUNTER] + 1; 00084 gl_api.atomic_dcas_entry( (volatile atom_t *) qs->enqueue, (atom_t *) next, (atom_t *) enqueue ); 00085 } 00086 } 00087 } 00088 while( cas_result == 0 ); 00089 00090 qe[QUEUE_COUNTER] = enqueue[QUEUE_COUNTER] + 1; 00091 gl_api.atomic_dcas_entry( (volatile atom_t *) qs->enqueue, (atom_t *) qe, (atom_t *) enqueue ); 00092 00093 return; 00094 } 00095 00096 00097 00098 00099 00100 /****************************************************************************/ 00101 int queue_dequeue( struct queue_state *qs, void **user_data ) 00102 { 00103 ALIGN(ALIGN_DOUBLE_POINTER) struct queue_element 00104 *enqueue[QUEUE_PAC_SIZE], 00105 *dequeue[QUEUE_PAC_SIZE], 00106 *next[QUEUE_PAC_SIZE]; 00107 00108 unsigned char 00109 cas_result = 0; 00110 00111 int 00112 rv = 1, 00113 state = QUEUE_STATE_UNKNOWN, 00114 finished_flag = LOWERED; 00115 00116 00117 do 00118 { 00119 dequeue[QUEUE_POINTER] = qs->dequeue[QUEUE_POINTER]; 00120 dequeue[QUEUE_COUNTER] = qs->dequeue[QUEUE_COUNTER]; 00121 00122 enqueue[QUEUE_POINTER] = qs->enqueue[QUEUE_POINTER]; 00123 enqueue[QUEUE_COUNTER] = qs->enqueue[QUEUE_COUNTER]; 00124 00125 next[QUEUE_POINTER] = dequeue[QUEUE_POINTER]->next[QUEUE_POINTER]; 00126 next[QUEUE_COUNTER] = dequeue[QUEUE_POINTER]->next[QUEUE_COUNTER]; 00127 00128 /* TRD : confirm that dequeue didn't move between reading it 00129 and reading its next pointer 00130 */ 00131 00132 if( dequeue[QUEUE_POINTER] == qs->dequeue[QUEUE_POINTER] and dequeue[QUEUE_COUNTER] == qs->dequeue[QUEUE_COUNTER] ) 00133 { 00134 if( enqueue[QUEUE_POINTER] == dequeue[QUEUE_POINTER] and next[QUEUE_POINTER] == 0 ) 00135 state = QUEUE_STATE_EMPTY; 00136 00137 if( enqueue[QUEUE_POINTER] == dequeue[QUEUE_POINTER] and next[QUEUE_POINTER] != 0 ) 00138 state = QUEUE_STATE_ENQUEUE_OUT_OF_PLACE; 00139 00140 if( enqueue[QUEUE_POINTER] != dequeue[QUEUE_POINTER] ) 00141 state = QUEUE_STATE_ATTEMPT_DEQUEUE; 00142 00143 switch( state ) 00144 { 00145 case QUEUE_STATE_EMPTY: 00146 *user_data = 0; 00147 rv = 0; 00148 finished_flag = RAISED; 00149 break; 00150 00151 case QUEUE_STATE_ENQUEUE_OUT_OF_PLACE: 00152 next[QUEUE_COUNTER] = enqueue[QUEUE_COUNTER] + 1; 00153 gl_api.atomic_dcas_entry( (volatile atom_t *) qs->enqueue, (atom_t *) next, (atom_t *) enqueue ); 00154 break; 00155 00156 case QUEUE_STATE_ATTEMPT_DEQUEUE: 00157 *user_data = next[QUEUE_POINTER]->user_data; 00158 00159 next[QUEUE_COUNTER] = dequeue[QUEUE_COUNTER] + 1; 00160 cas_result = gl_api.atomic_dcas_entry( (volatile atom_t *) qs->dequeue, (atom_t *) next, (atom_t *) dequeue ); 00161 00162 if( cas_result == 1 ) 00163 finished_flag = RAISED; 00164 break; 00165 } 00166 } 00167 } 00168 while( finished_flag == LOWERED ); 00169 00170 if( cas_result == 1 ) 00171 freelist_push( qs->fs, dequeue[QUEUE_POINTER]->fe ); 00172 00173 return( rv ); 00174 } 00175 00176 #endif /* !HLX_BUILD_WITH_PARALLEL_THREADING */