[PATCH] for Review: SelectorImpl.wakeup() + Synchronization

Mohan Embar gnustuff@thisiscool.com
Thu Dec 18 18:10:00 GMT 2003


Hi All,

This patch attempts to implement Selector.wakeup() and
added extra synchronization in places where I thought
the specification:

http://java.sun.com/j2se/1.4.2/docs/api/java/nio/channels/Selector.html

...mandated it.

I've attached my updated NetTest testcase which tests
this in a cursory manner (but doesn't test multiple
threads waking the selector up and stuff like that).
The attached archive also has the output from both
Sun's JRE as well as post-patch i686-pc-linux-gnu with
this test. The MinGW version also seems to diff correctly
with Sun's JRE on Win32 with this patch.

There's a lot of multithreaded stuff and synchronization
going on here and I've also deviated from the spec as far
as the implementation of wakeup() and thread interruption,
choosing to do the inverse implementation under the covers
where wakeup() uses thread interruption instead of the
other way around. So a second set of eyes would be appreciated.

One last thing: a 100% correct implementation of select()
would have required atomically:

- releasing selectThreadMutex's monitor
- entering the select
- then, reacquiring selectThreadMutex's monitor after the select
  was done.

My implementation doesn't do this and there can be race conditions
which I've alluded to in the comments. I think it can be demonstrated,
though, that the race conditions that can occur due to this approximation
can occur with the ideal implementation with a slightly different
calling sequence of the threads, so this approximation is probably
okay.

Enjoy.

-- Mohan
http://www.thisiscool.com/
http://www.animalsong.org/

ChangeLog
2003-12-18  Mohan Embar  <gnustuff@thisiscool.com>

	* gnu/java/nio/SelectorImpl.java
	(selectThreadMutex): New field.
	(selectThread): New field.
	(unhandledWakeup): New field.
	(implCloseSelector): Added skeleton code which
	synchronizes as per Sun JRE JavaDoc.
	(keys): Throw ClosedSelectorException if selector
	is closed.
	(selectNow): Added comment that we're faking out
	an immediate select with a one-microsecond-timeout one.
	(select): Use 0 instead of -1 for infinite timeout.
	(implSelect): Changed comment in declaration.
	(select): Added synchronized to method declaration.
	Added synchronization and wakeup support as per Sun
	JRE JavaDoc. Removed begin and end calls.
	(selectedKeys): Throw ClosedSelectorException if selector
	is closed.
	(wakeup): Implemented.
	(deregisterCancelledKeys): Synchronize on cancelled key
	set before deregistering.
	(register): Synchronize on key set before registering.
	* java/nio/channels/spi/AbstractSelector.java
	Added import for java.nio.channels.ClosedSelectorException.
	(close): Added synchronized to method declaration.
	(begin): Removed.
	(end): Removed.
	(cancelledKeys): Throw ClosedSelectorException if selector
	is closed.
	(cancelKey): Synchronize on cancelled key set before key.

Index: gnu/java/nio/SelectorImpl.java
===================================================================
RCS file: /cvs/gcc/gcc/libjava/gnu/java/nio/SelectorImpl.java,v
retrieving revision 1.12
diff -u -2 -r1.12 SelectorImpl.java
--- gnu/java/nio/SelectorImpl.java	9 Dec 2003 15:34:07 -0000	1.12
+++ gnu/java/nio/SelectorImpl.java	18 Dec 2003 17:24:50 -0000
@@ -66,4 +66,20 @@
   private Set selected;
 
+  // A dummy object whose monitor regulates access to both our
+  // selectThread and unhandledWakeup fields.
+  private Object selectThreadMutex = new Object ();
+  
+  // Any thread that's currently blocked in a select operation.
+  private Thread selectThread = null;
+  
+  // Indicates whether we have an unhandled wakeup call. This can
+  // be due to either wakeup() triggering a thread interruption while
+  // a thread was blocked in a select operation (in which case we need
+  // to reset this thread's interrupt status after interrupting the
+  // select), or else that no thread was on a select operation at the
+  // time that wakeup() was called, in which case the following select()
+  // operation should return immediately with nothing selected.
+  private boolean unhandledWakeup = false;
+
   public SelectorImpl (SelectorProvider provider)
   {
@@ -82,10 +98,24 @@
     throws IOException
   {
-    // FIXME: We surely need to do more here.
+    // Cancel any pending select operation.
     wakeup();
+    
+    synchronized (keys)
+      {
+        synchronized (selected)
+          {
+            synchronized (cancelledKeys ())
+              {
+                // FIXME: Release resources here.
+              }
+          }
+      }
   }
 
   public final Set keys()
   {
+    if (!isOpen())
+      throw new ClosedSelectorException ();
+
     return Collections.unmodifiableSet (keys);
   }
@@ -94,4 +124,6 @@
     throws IOException
   {
+    // FIXME: We're simulating an immediate select
+    // via a select with a timeout of one millisecond.
     return select (1);
   }
@@ -100,8 +132,8 @@
     throws IOException
   {
-    return select (-1);
+    return select (0);
   }
 
-  // A timeout value of -1 means block forever.
+  // A timeout value of 0 means block forever.
   private static native int implSelect (int[] read, int[] write,
                                         int[] except, long timeout)
@@ -145,93 +177,149 @@
   }
 
-  public int select (long timeout)
+  public synchronized int select (long timeout)
     throws IOException
   {
     if (!isOpen())
       throw new ClosedSelectorException ();
-
-    if (keys == null)
-	    {
-        return 0;
-	    }
-
-    deregisterCancelledKeys();
-
-    // Set only keys with the needed interest ops into the arrays.
-    int[] read = getFDsAsArray (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT);
-    int[] write = getFDsAsArray (SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
-    int[] except = new int [0]; // FIXME: We dont need to check this yet
-    int anzahl = read.length + write.length + except.length;
-
-    // Call the native select() on all file descriptors.
-    begin();
-    int result = implSelect (read, write, except, timeout);
-    end();
-
-    Iterator it = keys.iterator ();
-
-    while (it.hasNext ())
+      
+    synchronized (keys)
       {
-        int ops = 0;
-        SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
-
-        // If key is already selected retrieve old ready ops.
-        if (selected.contains (key))
+        synchronized (selected)
           {
-            ops = key.readyOps ();
-          }
+            deregisterCancelledKeys();
 
-        // Set new ready read/accept ops
-        for (int i = 0; i < read.length; i++)
-          {
-            if (key.getNativeFD() == read[i])
+            // Set only keys with the needed interest ops into the arrays.
+            int[] read = getFDsAsArray (SelectionKey.OP_READ
+                                        | SelectionKey.OP_ACCEPT);
+            int[] write = getFDsAsArray (SelectionKey.OP_WRITE
+                                         | SelectionKey.OP_CONNECT);
+
+            // FIXME: We dont need to check this yet
+            int[] except = new int [0];
+
+            // Test to see if we've got an unhandled wakeup call,
+            // in which case we return immediately. Otherwise,
+            // remember our current thread and jump into the select.
+            // The monitor for dummy object selectThreadMutex regulates
+            // access to these fields.
+
+            // FIXME: Not sure from the spec at what point we should
+            // return "immediately". Is it here or immediately upon
+            // entry to this function?
+            
+            // NOTE: There's a possibility of another thread calling
+            // wakeup() immediately after our thread releases
+            // selectThreadMutex's monitor here, in which case we'll
+            // do the select anyway. Since calls to wakeup() and select()
+            // among different threads happen in non-deterministic order,
+            // I don't think this is an issue.
+            synchronized (selectThreadMutex)
               {
-                if (key.channel () instanceof ServerSocketChannelImpl)
+                if (unhandledWakeup)
                   {
-                    ops = ops | SelectionKey.OP_ACCEPT;
+                    unhandledWakeup = false;
+                    return 0;
                   }
                 else
                   {
-                    ops = ops | SelectionKey.OP_READ;
+                    selectThread = Thread.currentThread ();
                   }
               }
-          }
 
-        // Set new ready write ops
-        for (int i = 0; i < write.length; i++)
-          {
-            if (key.getNativeFD() == write[i])
+            // Call the native select() on all file descriptors.
+            int result = implSelect (read, write, except, timeout);
+
+            // If our unhandled wakeup flag is set at this point,
+            // reset our thread's interrupt flag because we were
+            // awakened by wakeup() instead of an external thread
+            // interruption.
+            //
+            // NOTE: If we were blocked in a select() and one thread
+            // called Thread.interrupt() on the blocked thread followed
+            // by another thread calling Selector.wakeup(), then race
+            // conditions could make it so that the thread's interrupt
+            // flag is reset even though the Thread.interrupt() call
+            // "was there first". I don't think we need to care about
+            // this scenario.
+            synchronized (selectThreadMutex)
               {
-                ops = ops | SelectionKey.OP_WRITE;
-                
-//                 if (key.channel ().isConnected ())
-//                   {
-//                     ops = ops | SelectionKey.OP_WRITE;
-//                   }
-//                 else
-//                   {
-//                     ops = ops | SelectionKey.OP_CONNECT;
-//                   }
-             }
-          }
+                if (unhandledWakeup)
+                  {
+                    unhandledWakeup = false;
+                    selectThread.interrupted ();
+                  }
+                selectThread = null;
+              }
 
-        // FIXME: We dont handle exceptional file descriptors yet.
+            Iterator it = keys.iterator ();
 
-        // If key is not yet selected add it.
-        if (!selected.contains (key))
-          {
-            selected.add (key);
-          }
+            while (it.hasNext ())
+              {
+                int ops = 0;
+                SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
 
-        // Set new ready ops
-        key.readyOps (key.interestOps () & ops);
-      }
+                // If key is already selected retrieve old ready ops.
+                if (selected.contains (key))
+                  {
+                    ops = key.readyOps ();
+                  }
 
-    deregisterCancelledKeys();
-    return result;
+                // Set new ready read/accept ops
+                for (int i = 0; i < read.length; i++)
+                  {
+                    if (key.getNativeFD() == read[i])
+                      {
+                        if (key.channel () instanceof ServerSocketChannelImpl)
+                          {
+                            ops = ops | SelectionKey.OP_ACCEPT;
+                          }
+                        else
+                          {
+                            ops = ops | SelectionKey.OP_READ;
+                          }
+                      }
+                  }
+
+                // Set new ready write ops
+                for (int i = 0; i < write.length; i++)
+                  {
+                    if (key.getNativeFD() == write[i])
+                      {
+                        ops = ops | SelectionKey.OP_WRITE;
+
+        //                 if (key.channel ().isConnected ())
+        //                   {
+        //                     ops = ops | SelectionKey.OP_WRITE;
+        //                   }
+        //                 else
+        //                   {
+        //                     ops = ops | SelectionKey.OP_CONNECT;
+        //                   }
+                     }
+                  }
+
+                // FIXME: We dont handle exceptional file descriptors yet.
+
+                // If key is not yet selected add it.
+                if (!selected.contains (key))
+                  {
+                    selected.add (key);
+                  }
+
+                // Set new ready ops
+                key.readyOps (key.interestOps () & ops);
+              }
+            deregisterCancelledKeys();
+            
+            return result;
+          }
+        }
   }
     
   public final Set selectedKeys()
   {
+    if (!isOpen())
+      throw new ClosedSelectorException ();
+
     return selected;
   }
@@ -239,16 +327,38 @@
   public final Selector wakeup()
   {
-    return null;
+    // IMPLEMENTATION NOTE: Whereas the specification says that
+    // thread interruption should trigger a call to wakeup, we
+    // do the reverse under the covers: wakeup triggers a thread
+    // interrupt followed by a subsequent reset of the thread's
+    // interrupt status within select().
+    
+    // First, acquire the monitor of the object regulating
+    // access to our selectThread and unhandledWakeup fields.
+    synchronized (selectThreadMutex)
+      {
+        unhandledWakeup = true;
+        
+        // Interrupt any thread which is currently blocked in
+        // a select operation.
+        if (selectThread != null)
+          selectThread.interrupt ();
+      }
+      
+    return this;
   }
 
   private final void deregisterCancelledKeys()
   {
-    Iterator it = cancelledKeys().iterator();
-
-    while (it.hasNext ())
-      {
-        keys.remove ((SelectionKeyImpl) it.next ());
-        it.remove ();
-      }
+    Set ckeys = cancelledKeys ();
+    synchronized (ckeys)
+    {
+      Iterator it = ckeys.iterator();
+
+      while (it.hasNext ())
+        {
+          keys.remove ((SelectionKeyImpl) it.next ());
+          it.remove ();
+        }
+    }
   }
 
@@ -283,5 +393,9 @@
       }
 
-    keys.add (result);
+    synchronized (keys)
+      {
+        keys.add (result);
+      }
+
     result.interestOps (ops);
     result.attach (att);
Index: java/nio/channels/spi/AbstractSelector.java
===================================================================
RCS file: /cvs/gcc/gcc/libjava/java/nio/channels/spi/AbstractSelector.java,v
retrieving revision 1.4
diff -u -2 -r1.4 AbstractSelector.java
--- java/nio/channels/spi/AbstractSelector.java	9 Oct 2003 17:34:10 -0000	1.4
+++ java/nio/channels/spi/AbstractSelector.java	18 Dec 2003 17:24:54 -0000
@@ -40,4 +40,5 @@
 
 import java.io.IOException;
+import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -65,5 +66,5 @@
    * @exception IOException If an error occurs
    */
-  public final void close () throws IOException
+  public final synchronized void close () throws IOException
   {
     if (closed)
@@ -82,18 +83,4 @@
   }
 
-  /**
-   * Marks the beginning of an I/O operation that might block indefinitely.
-   */
-  protected final void begin()
-  {
-  }
-
-  /**
-   * Marks the end of an I/O operation that might block indefinitely.
-   */
-  protected final void end()
-  {
-  }
-    
   public final SelectorProvider provider ()
   {
@@ -103,4 +90,7 @@
   protected final Set cancelledKeys()
   {
+    if (!isOpen())
+      throw new ClosedSelectorException ();
+
     return cancelledKeys;
   }
@@ -108,5 +98,8 @@
   final void cancelKey (AbstractSelectionKey key)
   {
-    cancelledKeys.remove (key);
+    synchronized (cancelledKeys)
+      {
+        cancelledKeys.remove (key);
+      }
   }
 

-------------- next part --------------
A non-text attachment was scrubbed...
Name: NetTest.tar.bz2
Type: application/bzip2
Size: 5799 bytes
Desc: not available
URL: <http://gcc.gnu.org/pipermail/java-patches/attachments/20031218/496dafd1/attachment.bin>


More information about the Java-patches mailing list