This is the mail archive of the
java-patches@gcc.gnu.org
mailing list for the Java project.
[ecj] Implement java.util.concurrent support
- From: Andrew Haley <aph at redhat dot com>
- To: java-patches at gcc dot gnu dot org
- Date: Tue, 12 Sep 2006 18:11:59 +0100
- Subject: [ecj] Implement java.util.concurrent support
This patch implements Unsafe.park() and Unsafe.unpark(), the low-level
suspend/resume primitives used by java.util.concurrent. Unlike
Thread.suspend() and Thread.resume(), these are not prone to race
conditions.
I have also fixed a bug in Thread.interrupt(). This bug caused a
thread that was holding its own lock to be uninterruptible.
Some of this code is in the wrong place: the low-level code needs to
be moved into posix-threads.cc and a suitable interface added, so thst
this can be used on non-POSIX systems. Then, the park() and unpark()
methods may be moved to Thread.
2006-09-12 Andrew Haley <aph@redhat.com>
* java/lang/Thread.java (alive_flag): Make it a byte.
(THREAD_DEAD, THREAD_ALIVE, THREAD_SIGNALED): new constants.
(parkPermit): New variable.
(data): Now package private.
(Thread): Set alive_flag to THREAD_DEAD, not false.
(isAlive): Test against THREAD_DEAD.
* java/lang/natThread.cc (initialize_native): Initialize
park_mutex and park_cond.
(finish_): Set parkPermit THREAD_PARK_DEAD.
(interrupt): Rewrite. Use an atomic update rather than a mutex to
access alive_flag.
If the thread is parked, unpark it.
(start): Set alive_flag to THREAD_ALIVE, not true.
* sun/misc/natUnsafe.cc (unpark): New method.
(park): New method.
* include/jvm.h (struct natThread): Moved here.
Index: java/lang/Thread.java
===================================================================
--- java/lang/Thread.java (revision 116852)
+++ java/lang/Thread.java (working copy)
@@ -127,7 +127,16 @@
private int priority;
boolean interrupt_flag;
- private boolean alive_flag;
+
+ /** A thread is either alive, dead, or being sent a signal; if it is
+ being sent a signal, it is also alive. Thus, if you want to
+ know if a thread is alive, it is sufficient to test
+ alive_status != THREAD_DEAD. */
+ private static final byte THREAD_DEAD = 0;
+ private static final byte THREAD_ALIVE = 1;
+ private static final byte THREAD_SIGNALED = 2;
+ private volatile byte alive_flag;
+
private boolean startable_flag;
/** The context classloader for this Thread. */
@@ -156,8 +165,13 @@
*/
private Object parkBlocker;
- /** Used by Unsafe.park and Unsafe.unpark. */
- int parkPermit = 1;
+ /** Used by Unsafe.park and Unsafe.unpark. Se Unsafe for a full
+ description. */
+ static final byte THREAD_PARK_RUNNING = 0;
+ static final byte THREAD_PARK_PERMIT = 1;
+ static final byte THREAD_PARK_PARKED = 2;
+ static final byte THREAD_PARK_DEAD = 3;
+ byte parkPermit;
// This describes the top-most interpreter frame for this thread.
RawData interp_frame;
@@ -166,7 +180,7 @@
volatile int state;
// Our native data - points to an instance of struct natThread.
- private RawDataManaged data;
+ RawDataManaged data;
/**
* Allocates a new <code>Thread</code> object. This constructor has
@@ -381,7 +395,7 @@
data = null;
interrupt_flag = false;
- alive_flag = false;
+ alive_flag = THREAD_DEAD;
startable_flag = true;
if (current != null)
@@ -594,7 +608,7 @@
*/
public final synchronized boolean isAlive()
{
- return alive_flag;
+ return alive_flag != THREAD_DEAD;
}
/**
Index: java/lang/natThread.cc
===================================================================
--- java/lang/natThread.cc (revision 116678)
+++ java/lang/natThread.cc (working copy)
@@ -34,24 +34,6 @@
-// This structure is used to represent all the data the native side
-// needs. An object of this type is assigned to the `data' member of
-// the Thread class.
-struct natThread
-{
- // These are used to interrupt sleep and join calls. We can share a
- // condition variable here since it only ever gets notified when the thread
- // exits.
- _Jv_Mutex_t join_mutex;
- _Jv_ConditionVariable_t join_cond;
-
- // This is private data for the thread system layer.
- _Jv_Thread_t *thread;
-
- // Each thread has its own JNI object.
- JNIEnv *jni_env;
-};
-
static void finalize_native (jobject ptr);
// This is called from the constructor to initialize the native side
@@ -70,6 +52,10 @@
_Jv_MutexInit (&nt->join_mutex);
_Jv_CondInit (&nt->join_cond);
+
+ pthread_mutex_init (&nt->park_mutex, NULL);
+ pthread_cond_init (&nt->park_cond, NULL);
+
nt->thread = _Jv_ThreadInitData (this);
// FIXME: if JNI_ENV is set we will want to free it. It is
// malloc()d.
@@ -87,7 +73,10 @@
#ifdef _Jv_HaveMutexDestroy
_Jv_MutexDestroy (&nt->join_mutex);
#endif
- _Jv_FreeJNIEnv(nt->jni_env);
+ _Jv_FreeJNIEnv((JNIEnv*)nt->jni_env);
+
+ pthread_mutex_destroy (&nt->park_mutex);
+ pthread_cond_destroy (&nt->park_cond);
}
jint
@@ -119,10 +108,29 @@
java::lang::Thread::interrupt (void)
{
checkAccess ();
- natThread *nt = (natThread *) data;
- JvSynchronize sync (this);
- if (alive_flag)
- _Jv_ThreadInterrupt (nt->thread);
+
+ // If a thread is in state ALIVE, we atomically set it to state
+ // SIGNALED and send it a signal. Once we've sent it the signal, we
+ // set its state back to ALIVE.
+ if (__sync_bool_compare_and_swap
+ (&alive_flag, Thread::THREAD_ALIVE, Thread::THREAD_SIGNALED))
+ {
+ natThread *nt = (natThread *) data;
+
+ _Jv_ThreadInterrupt (nt->thread);
+ __sync_bool_compare_and_swap
+ (&alive_flag, THREAD_SIGNALED, Thread::THREAD_ALIVE);
+
+ // Even though we've interrupted this thread, it might still be
+ // parked.
+ if (__sync_bool_compare_and_swap
+ (&parkPermit, Thread::THREAD_PARK_PARKED, Thread::THREAD_PARK_RUNNING))
+ {
+ pthread_mutex_lock (&nt->park_mutex);
+ pthread_cond_signal (&nt->park_cond);
+ pthread_mutex_unlock (&nt->park_mutex);
+ }
+ }
}
void
@@ -201,6 +209,8 @@
void
java::lang::Thread::finish_ ()
{
+ parkPermit = THREAD_PARK_DEAD;
+ __sync_synchronize();
natThread *nt = (natThread *) data;
group->removeThread (this);
@@ -230,7 +240,7 @@
{
JvSynchronize sync (this);
- alive_flag = false;
+ alive_flag = THREAD_DEAD;
state = JV_TERMINATED;
}
@@ -332,7 +342,7 @@
if (!startable_flag)
throw new IllegalThreadStateException;
- alive_flag = true;
+ alive_flag = THREAD_ALIVE;
startable_flag = false;
state = JV_RUNNABLE;
natThread *nt = (natThread *) data;
@@ -424,7 +434,7 @@
java::lang::Thread *t = _Jv_ThreadCurrent ();
if (t == NULL)
return NULL;
- return ((natThread *) t->data)->jni_env;
+ return (JNIEnv *)((natThread *) t->data)->jni_env;
}
void
@@ -445,7 +455,7 @@
if (thread == NULL || thread->startable_flag == false)
return -1;
thread->startable_flag = false;
- thread->alive_flag = true;
+ thread->alive_flag = ::java::lang::Thread::THREAD_ALIVE;
thread->state = JV_RUNNABLE;
natThread *nt = (natThread *) thread->data;
_Jv_ThreadRegister (nt->thread);
Index: include/jvm.h
===================================================================
--- include/jvm.h (revision 116888)
+++ include/jvm.h (working copy)
@@ -732,7 +732,7 @@
// not be a Thread available.
JvSetThreadState(::java::lang::Thread *cthread, JvThreadState nstate)
: thread (cthread),
- saved (cthread ? cthread->state : JV_NEW)
+ saved (cthread ? cthread->state : (jint)JV_NEW)
{
if (thread)
thread->state = nstate;
@@ -745,4 +745,26 @@
}
};
+// This structure is used to represent all the data the native side
+// needs. An object of this type is assigned to the `data' member of
+// the Thread class.
+struct natThread
+{
+ // These are used to interrupt sleep and join calls. We can share a
+ // condition variable here since it only ever gets notified when the thread
+ // exits.
+ _Jv_Mutex_t join_mutex;
+ _Jv_ConditionVariable_t join_cond;
+
+ // These are used by Unsafe.park() and Unsafe.unpark().
+ pthread_mutex_t park_mutex;
+ pthread_cond_t park_cond;
+
+ // This is private data for the thread system layer.
+ _Jv_Thread_t *thread;
+
+ // Each thread has its own JNI object.
+ void *jni_env;
+};
+
#endif /* __JAVA_JVM_H__ */
Index: sun/misc/natUnsafe.cc
===================================================================
--- sun/misc/natUnsafe.cc (revision 116852)
+++ sun/misc/natUnsafe.cc (working copy)
@@ -155,11 +155,97 @@
void
sun::misc::Unsafe::unpark (::java::lang::Thread *thread)
{
- return;
+ using namespace ::java::lang;
+ volatile jbyte *ptr = &thread->parkPermit;
+
+ /* If this thread is in state RUNNING, give it a permit and return
+ immediately. */
+ if (__sync_bool_compare_and_swap
+ (ptr, Thread::THREAD_PARK_RUNNING, Thread::THREAD_PARK_PERMIT))
+ return;
+
+ /* If this thread is parked, put it into state RUNNING and send it a
+ signal. */
+ if (__sync_bool_compare_and_swap
+ (ptr, Thread::THREAD_PARK_PARKED, Thread::THREAD_PARK_RUNNING))
+ {
+ natThread *nt = (natThread *) thread->data;
+ pthread_mutex_lock (&nt->park_mutex);
+ pthread_cond_signal (&nt->park_cond);
+ pthread_mutex_unlock (&nt->park_mutex);
+ }
}
void
sun::misc::Unsafe::park (jboolean isAbsolute, jlong time)
{
-// ::java::lang::Thread::yield ();
+ using namespace ::java::lang;
+ Thread *thread = Thread::currentThread();
+ volatile jbyte *ptr = &thread->parkPermit;
+
+ /* If we have a permit, return immediately. */
+ if (__sync_bool_compare_and_swap
+ (ptr, Thread::THREAD_PARK_PERMIT, Thread::THREAD_PARK_RUNNING))
+ return;
+
+ struct timespec ts;
+ jlong millis = 0, nanos = 0;
+
+ if (time)
+ {
+ if (isAbsolute)
+ {
+ millis = time;
+ nanos = 0;
+ }
+ else
+ {
+ millis = java::lang::System::currentTimeMillis();
+ nanos = time;
+ }
+
+ if (millis > 0 || nanos > 0)
+ {
+ // Calculate the abstime corresponding to the timeout.
+ // Everything is in milliseconds.
+ //
+ // We use `unsigned long long' rather than jlong because our
+ // caller may pass up to Long.MAX_VALUE millis. This would
+ // overflow the range of a timespec.
+
+ unsigned long long m = (unsigned long long)millis;
+ unsigned long long seconds = m / 1000;
+
+ ts.tv_sec = seconds;
+ if (ts.tv_sec < 0 || (unsigned long long)ts.tv_sec != seconds)
+ {
+ // We treat a timeout that won't fit into a struct timespec
+ // as a wait forever.
+ millis = nanos = 0;
+ }
+ else
+ {
+ m %= 1000;
+ ts.tv_nsec = m * 1000000 + (unsigned long long)nanos;
+ }
+ }
+ }
+
+ natThread *nt = (natThread *) thread->data;
+ pthread_mutex_lock (&nt->park_mutex);
+ if (__sync_bool_compare_and_swap
+ (ptr, Thread::THREAD_PARK_RUNNING, Thread::THREAD_PARK_PARKED))
+ {
+ if (millis == 0 && nanos == 0)
+ pthread_cond_wait (&nt->park_cond, &nt->park_mutex);
+ else
+ pthread_cond_timedwait (&nt->park_cond, &nt->park_mutex,
+ &ts);
+ /* If we were unparked by some other thread, this will already
+ be in state THREAD_PARK_RUNNING. If we timed out, we have to
+ do it ourself. */
+ __sync_bool_compare_and_swap
+ (ptr, Thread::THREAD_PARK_PARKED, Thread::THREAD_PARK_RUNNING);
+ }
+ pthread_mutex_unlock (&nt->park_mutex);
}