1 /* Copyright (C) 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2006 Free Software Foundation, Inc.
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 /* $Id: coop.c,v 1.38.2.1 2006-02-12 13:42:51 mvo Exp $ */
21 /* Cooperative thread library, based on QuickThreads */
36 #include "libguile/eval.h"
38 \f/* #define COOP_STKSIZE (0x10000) */
39 #define COOP_STKSIZE (scm_eval_stack)
41 /* `alignment' must be a power of 2. */
42 #define COOP_STKALIGN(sp, alignment) \
43 ((void *)((((qt_word_t)(sp)) + (alignment) - 1) & ~((alignment)-1)))
47 /* Queue access functions. */
50 coop_qinit (coop_q_t *q)
52 q->t.next = q->tail = &q->t;
59 q->t.exceptfds = NULL;
65 coop_qget (coop_q_t *q)
74 { /* If it was already empty .. */
75 return NULL; /* .. say so. */
77 q->tail = &q->t; /* Else now it is empty. */
84 coop_qput (coop_q_t *q, coop_t *t)
92 coop_all_qput (coop_q_t *q, coop_t *t)
95 q->t.all_next->all_prev = t;
97 t->all_next = q->t.all_next;
102 coop_all_qremove (coop_q_t *q, coop_t *t)
105 t->all_prev->all_next = t->all_next;
107 q->t.all_next = t->all_next;
109 t->all_next->all_prev = t->all_prev;
112 /* Insert thread t into the ordered queue q.
113 q is ordered after wakeup_time. Threads which aren't sleeping but
114 waiting for I/O go last into the queue. */
116 coop_timeout_qinsert (coop_q_t *q, coop_t *t)
118 coop_t *pred = &q->t;
119 int sec = t->wakeup_time.tv_sec;
120 int usec = t->wakeup_time.tv_usec;
121 while (pred->next != &q->t
122 && pred->next->timeoutp
123 && (pred->next->wakeup_time.tv_sec < sec
124 || (pred->next->wakeup_time.tv_sec == sec
125 && pred->next->wakeup_time.tv_usec < usec)))
127 t->next = pred->next;
129 if (t->next == &q->t)
135 /* Thread routines. */
137 coop_q_t coop_global_runq; /* A queue of runable threads. */
138 coop_q_t coop_global_sleepq; /* A queue of sleeping threads. */
139 coop_q_t coop_tmp_queue; /* A temp working queue */
140 coop_q_t coop_global_allq; /* A queue of all threads. */
141 static coop_t coop_global_main; /* Thread for the process. */
142 coop_t *coop_global_curr; /* Currently-executing thread. */
144 #ifdef GUILE_PTHREAD_COMPAT
145 static coop_q_t coop_deadq;
146 static int coop_quitting_p = -1;
147 static pthread_cond_t coop_cond_quit;
148 static pthread_cond_t coop_cond_create;
149 static pthread_mutex_t coop_mutex_create;
150 static pthread_t coop_mother;
151 static int mother_awake_p = 0;
152 static coop_t *coop_child;
155 static void *coop_starthelp (qt_t *old, void *ignore0, void *ignore1);
156 static void coop_only (void *pu, void *pt, qt_userf_t *f);
157 static void *coop_aborthelp (qt_t *sp, void *old, void *null);
158 static void *coop_yieldhelp (qt_t *sp, void *old, void *blockq);
161 /* called on process termination. */
167 extern int on_exit (void (*procp) (), int arg);
170 coop_finish (int status, void *arg)
172 #error Dont know how to setup a cleanup handler on your system.
176 #ifdef GUILE_PTHREAD_COMPAT
178 pthread_cond_signal (&coop_cond_create);
179 pthread_cond_broadcast (&coop_cond_quit);
186 coop_qinit (&coop_global_runq);
187 coop_qinit (&coop_global_sleepq);
188 coop_qinit (&coop_tmp_queue);
189 coop_qinit (&coop_global_allq);
190 coop_global_curr = &coop_global_main;
191 #ifdef GUILE_PTHREAD_COMPAT
192 coop_qinit (&coop_deadq);
193 pthread_cond_init (&coop_cond_quit, NULL);
194 pthread_cond_init (&coop_cond_create, NULL);
195 pthread_mutex_init (&coop_mutex_create, NULL);
198 atexit (coop_finish);
201 on_exit (coop_finish, 0);
211 while ((next = coop_qget (&coop_global_runq)) != NULL) {
212 coop_global_curr = next;
213 QT_BLOCK (coop_starthelp, 0, 0, next->sp);
219 coop_starthelp (qt_t *old, void *ignore0, void *ignore1)
221 coop_global_main.sp = old;
222 coop_global_main.joining = NULL;
223 coop_qput (&coop_global_runq, &coop_global_main);
224 return NULL; /* not used, but keeps compiler happy */
228 coop_mutex_init (coop_m *m)
230 return coop_new_mutex_init (m, NULL);
234 coop_new_mutex_init (coop_m *m, coop_mattr *attr)
238 coop_qinit(&(m->waiting));
243 coop_mutex_trylock (coop_m *m)
245 if (m->owner == NULL)
247 m->owner = coop_global_curr;
250 else if (m->owner == coop_global_curr)
260 coop_mutex_lock (coop_m *m)
262 if (m->owner == NULL)
264 m->owner = coop_global_curr;
266 else if (m->owner == coop_global_curr)
272 coop_t *old, *newthread;
274 /* Record the current top-of-stack before going to sleep */
275 coop_global_curr->top = &old;
277 newthread = coop_wait_for_runnable_thread();
278 if (newthread == coop_global_curr)
280 old = coop_global_curr;
281 coop_global_curr = newthread;
282 QT_BLOCK (coop_yieldhelp, old, &(m->waiting), newthread->sp);
289 coop_mutex_unlock (coop_m *m)
291 coop_t *old, *newthread;
295 newthread = coop_qget (&(m->waiting));
296 if (newthread != NULL)
298 /* Record the current top-of-stack before going to sleep */
299 coop_global_curr->top = &old;
301 old = coop_global_curr;
302 coop_global_curr = newthread;
303 /* The new thread came into m->waiting through a lock operation.
304 It now owns this mutex. */
305 m->owner = coop_global_curr;
306 QT_BLOCK (coop_yieldhelp, old, &coop_global_runq, newthread->sp);
313 else if (m->level > 0)
323 coop_mutex_destroy (coop_m *m)
330 coop_condition_variable_init (coop_c *c)
332 return coop_new_condition_variable_init (c, NULL);
336 coop_new_condition_variable_init (coop_c *c, coop_cattr *a)
338 coop_qinit(&(c->waiting));
343 coop_condition_variable_wait_mutex (coop_c *c, coop_m *m)
345 coop_t *old, *newthread;
347 /* coop_mutex_unlock (m); */
348 newthread = coop_qget (&(m->waiting));
349 if (newthread != NULL)
351 m->owner = newthread;
356 /*fixme* Should we really wait here? Isn't it OK just to proceed? */
357 newthread = coop_wait_for_runnable_thread();
358 if (newthread == coop_global_curr)
361 coop_global_curr->top = &old;
362 old = coop_global_curr;
363 coop_global_curr = newthread;
364 QT_BLOCK (coop_yieldhelp, old, &(c->waiting), newthread->sp);
371 coop_condition_variable_timed_wait_mutex (coop_c *c,
373 const scm_t_timespec *abstime)
378 #elif defined (WSAETIMEDOUT)
379 int res = WSAETIMEDOUT;
384 /* coop_mutex_unlock (m); */
385 t = coop_qget (&(m->waiting));
393 coop_global_curr->timeoutp = 1;
394 coop_global_curr->wakeup_time.tv_sec = abstime->tv_sec;
395 coop_global_curr->wakeup_time.tv_usec = abstime->tv_nsec / 1000;
396 coop_timeout_qinsert (&coop_global_sleepq, coop_global_curr);
397 t = coop_wait_for_runnable_thread();
399 if (t != coop_global_curr)
401 coop_global_curr->top = &old;
402 old = coop_global_curr;
403 coop_global_curr = t;
404 QT_BLOCK (coop_yieldhelp, old, &(c->waiting), t->sp);
406 /* Are we still in the sleep queue? */
407 old = &coop_global_sleepq.t;
408 for (t = old->next; t != &coop_global_sleepq.t; old = t, t = t->next)
409 if (t == coop_global_curr)
411 old->next = t->next; /* unlink */
421 coop_condition_variable_broadcast (coop_c *c)
425 while ((newthread = coop_qget (&(c->waiting))) != NULL)
427 coop_qput (&coop_global_runq, newthread);
433 coop_condition_variable_signal (coop_c *c)
435 return coop_condition_variable_broadcast (c);
442 static int n_keys = 0;
443 static int max_keys = 0;
444 static void (**destructors) (void *) = 0;
447 coop_key_create (coop_k *keyp, void (*destructor) (void *value))
449 if (n_keys >= max_keys)
452 max_keys = max_keys ? max_keys * 3 / 2 : 10;
453 destructors = realloc (destructors, sizeof (void *) * max_keys);
454 if (destructors == 0)
456 fprintf (stderr, "Virtual memory exceeded in coop_key_create\n");
459 for (i = n_keys; i < max_keys; ++i)
460 destructors[i] = NULL;
462 destructors[n_keys] = destructor;
468 coop_setspecific (coop_k key, const void *value)
470 int n_keys = coop_global_curr->n_keys;
474 coop_global_curr->n_keys = max_keys;
475 coop_global_curr->specific = realloc (n_keys
476 ? coop_global_curr->specific
478 sizeof (void *) * max_keys);
479 if (coop_global_curr->specific == 0)
481 fprintf (stderr, "Virtual memory exceeded in coop_setspecific\n");
484 for (i = n_keys; i < max_keys; ++i)
485 coop_global_curr->specific[i] = NULL;
487 coop_global_curr->specific[key] = (void *) value;
492 coop_getspecific (coop_k key)
494 return (key < coop_global_curr->n_keys
495 ? coop_global_curr->specific[key]
500 coop_key_delete (coop_k key)
507 coop_condition_variable_destroy (coop_c *c)
512 #ifdef GUILE_PTHREAD_COMPAT
514 /* 1K room for the cond wait routine */
515 #if SCM_STACK_GROWS_UP
516 # define COOP_STACK_ROOM (256)
518 # define COOP_STACK_ROOM (-256)
522 dummy_start (void *coop_thread)
524 coop_t *t = (coop_t *) coop_thread;
526 t->sp = (qt_t *) (&t + COOP_STACK_ROOM);
527 pthread_mutex_init (&t->dummy_mutex, NULL);
528 pthread_mutex_lock (&t->dummy_mutex);
531 res = pthread_cond_wait (&coop_cond_quit, &t->dummy_mutex);
532 while (res == EINTR);
539 pthread_mutex_lock (&coop_mutex_create);
540 while (!coop_quitting_p)
543 pthread_create (&coop_child->dummy_thread,
549 res = pthread_cond_wait (&coop_cond_create, &coop_mutex_create);
550 while (res == EINTR);
558 coop_create (coop_userf_t *f, void *pu)
561 #ifndef GUILE_PTHREAD_COMPAT
565 #ifdef GUILE_PTHREAD_COMPAT
566 t = coop_qget (&coop_deadq);
576 t = scm_malloc (sizeof (coop_t));
579 #ifdef GUILE_PTHREAD_COMPAT
582 if (coop_quitting_p < 0)
585 /* We can't create threads ourselves since the pthread
586 * corresponding to this stack might be sleeping.
588 pthread_create (&coop_mother, NULL, mother, NULL);
592 pthread_cond_signal (&coop_cond_create);
594 /* We can't use a pthreads condition variable since "this"
595 * pthread could already be asleep. We can't use a COOP
596 * condition variable because they are not safe against
597 * pre-emptive switching.
599 while (coop_child || mother_awake_p)
602 t->sto = scm_malloc (COOP_STKSIZE);
603 sto = COOP_STKALIGN (t->sto, QT_STKALIGN);
604 t->sp = QT_SP (sto, COOP_STKSIZE - QT_STKALIGN);
608 t->sp = QT_ARGS (t->sp, pu, t, (qt_userf_t *)f, coop_only);
610 coop_qput (&coop_global_runq, t);
611 coop_all_qput (&coop_global_allq, t);
618 coop_only (void *pu, void *pt, qt_userf_t *f)
620 coop_global_curr = (coop_t *)pt;
621 (*(coop_userf_t *)f)(pu);
630 coop_t *old, *newthread;
632 /* Wake up any threads that are waiting to join this one */
633 if (coop_global_curr->joining)
635 while ((newthread = coop_qget ((coop_q_t *)(coop_global_curr->joining)))
638 coop_qput (&coop_global_runq, newthread);
640 free (coop_global_curr->joining);
645 newthread = coop_wait_for_runnable_thread();
646 } while (newthread == coop_global_curr);
648 coop_all_qremove (&coop_global_allq, coop_global_curr);
649 old = coop_global_curr;
650 coop_global_curr = newthread;
651 QT_ABORT (coop_aborthelp, old, (void *) NULL, newthread->sp);
656 coop_aborthelp (qt_t *sp, void *old, void *null)
658 coop_t *oldthread = (coop_t *) old;
660 if (oldthread->specific)
661 free (oldthread->specific);
662 #ifndef GUILE_PTHREAD_COMPAT
663 free (oldthread->sto);
666 coop_qput (&coop_deadq, oldthread);
676 coop_t *old, *newthread;
678 /* Create a join list if necessary */
679 if (t->joining == NULL)
681 t->joining = scm_malloc(sizeof(coop_q_t));
682 coop_qinit((coop_q_t *) t->joining);
685 newthread = coop_wait_for_runnable_thread();
686 if (newthread == coop_global_curr)
688 old = coop_global_curr;
689 coop_global_curr = newthread;
690 QT_BLOCK (coop_yieldhelp, old, (coop_q_t *) t->joining, newthread->sp);
699 newthread = coop_next_runnable_thread();
701 /* There may be no other runnable threads. Return if this is the
703 if (newthread == coop_global_curr)
706 old = coop_global_curr;
708 coop_global_curr = newthread;
709 QT_BLOCK (coop_yieldhelp, old, &coop_global_runq, newthread->sp);
714 coop_yieldhelp (qt_t *sp, void *old, void *blockq)
716 ((coop_t *)old)->sp = sp;
717 coop_qput ((coop_q_t *)blockq, (coop_t *)old);
721 /* Replacement for the system's sleep() function. Does the right thing
722 for the process - but not for the system (it busy-waits) */
725 coop_sleephelp (qt_t *sp, void *old, void *blockq)
727 ((coop_t *)old)->sp = sp;
728 /* old is already on the sleep queue - so there's no need to
729 do anything extra here */
734 scm_thread_usleep (unsigned long usec)
736 struct timeval timeout;
738 timeout.tv_usec = usec;
739 scm_internal_select (0, NULL, NULL, NULL, &timeout);
740 return 0; /* Maybe we should calculate actual time slept,
741 but this is faster... :) */
745 scm_thread_sleep (unsigned long sec)
747 time_t now = time (NULL);
748 struct timeval timeout;
750 timeout.tv_sec = sec;
752 scm_internal_select (0, NULL, NULL, NULL, &timeout);
753 slept = time (NULL) - now;
754 return slept > sec ? 0 : sec - slept;