This is the mail archive of the java-patches@gcc.gnu.org mailing list for the Java project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

[ecj] Implement java.util.concurrent support


This patch implements Unsafe.park() and Unsafe.unpark(), the low-level
suspend/resume primitives used by java.util.concurrent.  Unlike
Thread.suspend() and Thread.resume(), these are not prone to race
conditions.

I have also fixed a bug in Thread.interrupt().  This bug caused a
thread that was holding its own lock to be uninterruptible.

Some of this code is in the wrong place: the low-level code needs to
be moved into posix-threads.cc and a suitable interface added, so thst
this can be used on non-POSIX systems.  Then, the park() and unpark()
methods may be moved to Thread.

2006-09-12  Andrew Haley  <aph@redhat.com>

	* java/lang/Thread.java (alive_flag): Make it a byte.
	(THREAD_DEAD, THREAD_ALIVE, THREAD_SIGNALED): new constants.
	(parkPermit): New variable.
	(data): Now package private.
	(Thread): Set alive_flag to THREAD_DEAD, not false.
	(isAlive): Test against THREAD_DEAD.
	* java/lang/natThread.cc (initialize_native): Initialize
	park_mutex and park_cond.
	(finish_): Set parkPermit THREAD_PARK_DEAD.
	(interrupt): Rewrite.  Use an atomic update rather than a mutex to
	access alive_flag.
	If the thread is parked, unpark it.
	(start): Set alive_flag to THREAD_ALIVE, not true.
	* sun/misc/natUnsafe.cc (unpark): New method.
	(park): New method.
	* include/jvm.h (struct natThread): Moved here.

Index: java/lang/Thread.java
===================================================================
--- java/lang/Thread.java	(revision 116852)
+++ java/lang/Thread.java	(working copy)
@@ -127,7 +127,16 @@
   private int priority;
 
   boolean interrupt_flag;
-  private boolean alive_flag;
+
+  /** A thread is either alive, dead, or being sent a signal; if it is
+      being sent a signal, it is also alive.  Thus, if you want to
+      know if a thread is alive, it is sufficient to test 
+      alive_status != THREAD_DEAD. */
+  private static final byte THREAD_DEAD = 0;
+  private static final byte THREAD_ALIVE = 1;
+  private static final byte THREAD_SIGNALED = 2;
+  private volatile byte alive_flag;
+
   private boolean startable_flag;
 
   /** The context classloader for this Thread. */
@@ -156,8 +165,13 @@
    */
   private Object parkBlocker;
 
-  /** Used by Unsafe.park and Unsafe.unpark.  */
-  int parkPermit = 1;
+  /** Used by Unsafe.park and Unsafe.unpark.  Se Unsafe for a full
+      description.  */
+  static final byte THREAD_PARK_RUNNING = 0;
+  static final byte THREAD_PARK_PERMIT = 1;
+  static final byte THREAD_PARK_PARKED = 2;
+  static final byte THREAD_PARK_DEAD = 3;
+  byte  parkPermit;
 
   // This describes the top-most interpreter frame for this thread.
   RawData interp_frame;
@@ -166,7 +180,7 @@
   volatile int state;
 
   // Our native data - points to an instance of struct natThread.
-  private RawDataManaged data;
+  RawDataManaged data;
 
   /**
    * Allocates a new <code>Thread</code> object. This constructor has
@@ -381,7 +395,7 @@
       
     data = null;
     interrupt_flag = false;
-    alive_flag = false;
+    alive_flag = THREAD_DEAD;
     startable_flag = true;
 
     if (current != null)
@@ -594,7 +608,7 @@
    */
   public final synchronized boolean isAlive()
   {
-    return alive_flag;
+    return alive_flag != THREAD_DEAD;
   }
 
   /**
Index: java/lang/natThread.cc
===================================================================
--- java/lang/natThread.cc	(revision 116678)
+++ java/lang/natThread.cc	(working copy)
@@ -34,24 +34,6 @@
 
 
 
-// This structure is used to represent all the data the native side
-// needs.  An object of this type is assigned to the `data' member of
-// the Thread class.
-struct natThread
-{
-  // These are used to interrupt sleep and join calls.  We can share a
-  // condition variable here since it only ever gets notified when the thread
-  // exits.
-  _Jv_Mutex_t join_mutex;
-  _Jv_ConditionVariable_t join_cond;
-
-  // This is private data for the thread system layer.
-  _Jv_Thread_t *thread;
-
-  // Each thread has its own JNI object.
-  JNIEnv *jni_env;
-};
-
 static void finalize_native (jobject ptr);
 
 // This is called from the constructor to initialize the native side
@@ -70,6 +52,10 @@
 
   _Jv_MutexInit (&nt->join_mutex);
   _Jv_CondInit (&nt->join_cond);
+
+  pthread_mutex_init (&nt->park_mutex, NULL);
+  pthread_cond_init (&nt->park_cond, NULL);
+
   nt->thread = _Jv_ThreadInitData (this);
   // FIXME: if JNI_ENV is set we will want to free it.  It is
   // malloc()d.
@@ -87,7 +73,10 @@
 #ifdef _Jv_HaveMutexDestroy
   _Jv_MutexDestroy (&nt->join_mutex);
 #endif
-  _Jv_FreeJNIEnv(nt->jni_env);
+  _Jv_FreeJNIEnv((JNIEnv*)nt->jni_env);
+
+  pthread_mutex_destroy (&nt->park_mutex);
+  pthread_cond_destroy (&nt->park_cond);
 }
 
 jint
@@ -119,10 +108,29 @@
 java::lang::Thread::interrupt (void)
 {
   checkAccess ();
-  natThread *nt = (natThread *) data;
-  JvSynchronize sync (this);
-  if (alive_flag)
-    _Jv_ThreadInterrupt (nt->thread);
+
+  // If a thread is in state ALIVE, we atomically set it to state
+  // SIGNALED and send it a signal.  Once we've sent it the signal, we
+  // set its state back to ALIVE.
+  if (__sync_bool_compare_and_swap 
+      (&alive_flag, Thread::THREAD_ALIVE, Thread::THREAD_SIGNALED))
+    {
+      natThread *nt = (natThread *) data;
+
+      _Jv_ThreadInterrupt (nt->thread);
+      __sync_bool_compare_and_swap 
+	(&alive_flag, THREAD_SIGNALED, Thread::THREAD_ALIVE);
+
+      // Even though we've interrupted this thread, it might still be
+      // parked.
+      if (__sync_bool_compare_and_swap 
+	  (&parkPermit, Thread::THREAD_PARK_PARKED, Thread::THREAD_PARK_RUNNING))
+	{
+	  pthread_mutex_lock (&nt->park_mutex);
+	  pthread_cond_signal (&nt->park_cond);
+	  pthread_mutex_unlock (&nt->park_mutex);
+	}
+    }
 }
 
 void
@@ -201,6 +209,8 @@
 void
 java::lang::Thread::finish_ ()
 {
+  parkPermit = THREAD_PARK_DEAD;
+  __sync_synchronize();
   natThread *nt = (natThread *) data;
   
   group->removeThread (this);
@@ -230,7 +240,7 @@
 
   {
     JvSynchronize sync (this);
-    alive_flag = false;
+    alive_flag = THREAD_DEAD;
     state = JV_TERMINATED;
   }
 
@@ -332,7 +342,7 @@
   if (!startable_flag)
     throw new IllegalThreadStateException;
 
-  alive_flag = true;
+  alive_flag = THREAD_ALIVE;
   startable_flag = false;
   state = JV_RUNNABLE;
   natThread *nt = (natThread *) data;
@@ -424,7 +434,7 @@
   java::lang::Thread *t = _Jv_ThreadCurrent ();
   if (t == NULL)
     return NULL;
-  return ((natThread *) t->data)->jni_env;
+  return (JNIEnv *)((natThread *) t->data)->jni_env;
 }
 
 void
@@ -445,7 +455,7 @@
   if (thread == NULL || thread->startable_flag == false)
     return -1;
   thread->startable_flag = false;
-  thread->alive_flag = true;
+  thread->alive_flag = ::java::lang::Thread::THREAD_ALIVE;
   thread->state = JV_RUNNABLE;
   natThread *nt = (natThread *) thread->data;
   _Jv_ThreadRegister (nt->thread);
Index: include/jvm.h
===================================================================
--- include/jvm.h	(revision 116888)
+++ include/jvm.h	(working copy)
@@ -732,7 +732,7 @@
   // not be a Thread available.
   JvSetThreadState(::java::lang::Thread *cthread, JvThreadState nstate)
     : thread (cthread),
-      saved (cthread ? cthread->state : JV_NEW)
+      saved (cthread ? cthread->state : (jint)JV_NEW)
   {
     if (thread)
       thread->state = nstate;
@@ -745,4 +745,26 @@
   }
 };
 
+// This structure is used to represent all the data the native side
+// needs.  An object of this type is assigned to the `data' member of
+// the Thread class.
+struct natThread
+{
+  // These are used to interrupt sleep and join calls.  We can share a
+  // condition variable here since it only ever gets notified when the thread
+  // exits.
+  _Jv_Mutex_t join_mutex;
+  _Jv_ConditionVariable_t join_cond;
+
+  // These are used by Unsafe.park() and Unsafe.unpark().  
+  pthread_mutex_t park_mutex;
+  pthread_cond_t park_cond;
+
+  // This is private data for the thread system layer.
+  _Jv_Thread_t *thread;
+
+  // Each thread has its own JNI object.
+  void *jni_env;
+};
+
 #endif /* __JAVA_JVM_H__ */
Index: sun/misc/natUnsafe.cc
===================================================================
--- sun/misc/natUnsafe.cc	(revision 116852)
+++ sun/misc/natUnsafe.cc	(working copy)
@@ -155,11 +155,97 @@
 void
 sun::misc::Unsafe::unpark (::java::lang::Thread *thread)
 {
-  return;
+  using namespace ::java::lang;
+  volatile jbyte *ptr = &thread->parkPermit;
+
+  /* If this thread is in state RUNNING, give it a permit and return
+     immediately.  */
+  if (__sync_bool_compare_and_swap 
+      (ptr, Thread::THREAD_PARK_RUNNING, Thread::THREAD_PARK_PERMIT))
+    return;
+  
+  /* If this thread is parked, put it into state RUNNING and send it a
+     signal.  */
+  if (__sync_bool_compare_and_swap 
+      (ptr, Thread::THREAD_PARK_PARKED, Thread::THREAD_PARK_RUNNING))
+    {
+      natThread *nt = (natThread *) thread->data;
+      pthread_mutex_lock (&nt->park_mutex);
+      pthread_cond_signal (&nt->park_cond);
+      pthread_mutex_unlock (&nt->park_mutex);
+    }
 }
 
 void
 sun::misc::Unsafe::park (jboolean isAbsolute, jlong time)
 {
-//   ::java::lang::Thread::yield ();
+  using namespace ::java::lang;
+  Thread *thread = Thread::currentThread();
+  volatile jbyte *ptr = &thread->parkPermit;
+
+  /* If we have a permit, return immediately.  */
+  if (__sync_bool_compare_and_swap 
+      (ptr, Thread::THREAD_PARK_PERMIT, Thread::THREAD_PARK_RUNNING))
+    return;
+
+  struct timespec ts;
+  jlong millis = 0, nanos = 0;
+
+  if (time)
+    {
+      if (isAbsolute)
+	{
+	  millis = time;
+	  nanos = 0;
+	}
+      else
+	{
+	  millis = java::lang::System::currentTimeMillis();
+	  nanos = time;
+	}
+
+      if (millis > 0 || nanos > 0)
+	{
+	  // Calculate the abstime corresponding to the timeout.
+	  // Everything is in milliseconds.
+	  //
+	  // We use `unsigned long long' rather than jlong because our
+	  // caller may pass up to Long.MAX_VALUE millis.  This would
+	  // overflow the range of a timespec.
+
+	  unsigned long long m = (unsigned long long)millis;
+	  unsigned long long seconds = m / 1000; 
+
+	  ts.tv_sec = seconds;
+	  if (ts.tv_sec < 0 || (unsigned long long)ts.tv_sec != seconds)
+	    {
+	      // We treat a timeout that won't fit into a struct timespec
+	      // as a wait forever.
+	      millis = nanos = 0;
+	    }
+	  else
+	    {
+	      m %= 1000;
+	      ts.tv_nsec = m * 1000000 + (unsigned long long)nanos;
+	    }
+	}
+    }
+      
+  natThread *nt = (natThread *) thread->data;
+  pthread_mutex_lock (&nt->park_mutex);
+  if (__sync_bool_compare_and_swap 
+      (ptr, Thread::THREAD_PARK_RUNNING, Thread::THREAD_PARK_PARKED))
+    {
+      if (millis == 0 && nanos == 0)
+	pthread_cond_wait (&nt->park_cond, &nt->park_mutex);
+      else
+	pthread_cond_timedwait (&nt->park_cond, &nt->park_mutex, 
+					&ts);
+      /* If we were unparked by some other thread, this will already
+	 be in state THREAD_PARK_RUNNING.  If we timed out, we have to
+	 do it ourself.  */
+      __sync_bool_compare_and_swap 
+	(ptr, Thread::THREAD_PARK_PARKED, Thread::THREAD_PARK_RUNNING);
+    }
+  pthread_mutex_unlock (&nt->park_mutex);
 }


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]