This is the mail archive of the java-patches@gcc.gnu.org mailing list for the Java project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

[PATCH] for Review: SelectorImpl.wakeup() + Synchronization (UPDATED)


Hi Michael,

>I'll redo the patch with the corrections you mentioned, leave AbstractSelector
>alone and make the begin() and end() calls (which will do nothing). I'll
>let you figure out what to do from there. If it's any consolation, the output
>diffs beautifully between Sun's JRE and post-patch gcj.

Here is the updated patch for this:

http://gcc.gnu.org/ml/java-patches/2003-q4/msg00815.html

...with the corrections you asked for. The output is identical
to the previous patch.

-- Mohan
http://www.thisiscool.com/
http://www.animalsong.org/

ChangeLog
2003-12-19  Mohan Embar  <gnustuff@thisiscool.com>

	* gnu/java/nio/SelectorImpl.java
	(selectThreadMutex): New field.
	(selectThread): New field.
	(unhandledWakeup): New field.
	(implCloseSelector): Added skeleton code which
	synchronizes as per Sun JRE JavaDoc.
	(keys): Throw ClosedSelectorException if selector
	is closed.
	(selectNow): Added comment that we're faking out
	an immediate select with a one-microsecond-timeout one.
	(select): Use 0 instead of -1 for infinite timeout.
	(implSelect): Changed comment in declaration.
	(select): Added synchronized to method declaration.
	Added synchronization and wakeup support as per Sun
	JRE JavaDoc.
	(selectedKeys): Throw ClosedSelectorException if selector
	is closed.
	(wakeup): Implemented.
	(deregisterCancelledKeys): Synchronize on cancelled key
	set before deregistering.
	(register): Synchronize on key set before registering.
	* java/nio/channels/spi/AbstractSelector.java
	Added import for java.nio.channels.ClosedSelectorException.
	(close): Added synchronized to method declaration.
	(cancelledKeys): Throw ClosedSelectorException if selector
	is closed.
	(cancelKey): Synchronize on cancelled key set before key.

Index: gnu/java/nio/SelectorImpl.java
===================================================================
RCS file: /cvs/gcc/gcc/libjava/gnu/java/nio/SelectorImpl.java,v
retrieving revision 1.12
diff -u -2 -r1.12 SelectorImpl.java
--- gnu/java/nio/SelectorImpl.java	9 Dec 2003 15:34:07 -0000	1.12
+++ gnu/java/nio/SelectorImpl.java	20 Dec 2003 00:38:31 -0000
@@ -66,4 +66,26 @@
   private Set selected;
 
+  /**
+   * A dummy object whose monitor regulates access to both our
+   * selectThread and unhandledWakeup fields.
+   */
+  private Object selectThreadMutex = new Object ();
+  
+  /**
+   * Any thread that's currently blocked in a select operation.
+   */
+  private Thread selectThread;
+  
+  /**
+   * Indicates whether we have an unhandled wakeup call. This can
+   * be due to either wakeup() triggering a thread interruption while
+   * a thread was blocked in a select operation (in which case we need
+   * to reset this thread's interrupt status after interrupting the
+   * select), or else that no thread was on a select operation at the
+   * time that wakeup() was called, in which case the following select()
+   * operation should return immediately with nothing selected.
+   */
+  private boolean unhandledWakeup;
+
   public SelectorImpl (SelectorProvider provider)
   {
@@ -82,10 +104,24 @@
     throws IOException
   {
-    // FIXME: We surely need to do more here.
+    // Cancel any pending select operation.
     wakeup();
+    
+    synchronized (keys)
+      {
+        synchronized (selected)
+          {
+            synchronized (cancelledKeys ())
+              {
+                // FIXME: Release resources here.
+              }
+          }
+      }
   }
 
   public final Set keys()
   {
+    if (!isOpen())
+      throw new ClosedSelectorException();
+
     return Collections.unmodifiableSet (keys);
   }
@@ -94,4 +130,6 @@
     throws IOException
   {
+    // FIXME: We're simulating an immediate select
+    // via a select with a timeout of one millisecond.
     return select (1);
   }
@@ -100,8 +138,8 @@
     throws IOException
   {
-    return select (-1);
+    return select (0);
   }
 
-  // A timeout value of -1 means block forever.
+  // A timeout value of 0 means block forever.
   private static native int implSelect (int[] read, int[] write,
                                         int[] except, long timeout)
@@ -145,93 +183,158 @@
   }
 
-  public int select (long timeout)
+  public synchronized int select (long timeout)
     throws IOException
   {
     if (!isOpen())
-      throw new ClosedSelectorException ();
-
-    if (keys == null)
-	    {
-        return 0;
-	    }
-
-    deregisterCancelledKeys();
-
-    // Set only keys with the needed interest ops into the arrays.
-    int[] read = getFDsAsArray (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT);
-    int[] write = getFDsAsArray (SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
-    int[] except = new int [0]; // FIXME: We dont need to check this yet
-    int anzahl = read.length + write.length + except.length;
-
-    // Call the native select() on all file descriptors.
-    begin();
-    int result = implSelect (read, write, except, timeout);
-    end();
-
-    Iterator it = keys.iterator ();
-
-    while (it.hasNext ())
+      throw new ClosedSelectorException();
+      
+    synchronized (keys)
       {
-        int ops = 0;
-        SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
-
-        // If key is already selected retrieve old ready ops.
-        if (selected.contains (key))
+        synchronized (selected)
           {
-            ops = key.readyOps ();
-          }
+            deregisterCancelledKeys();
 
-        // Set new ready read/accept ops
-        for (int i = 0; i < read.length; i++)
-          {
-            if (key.getNativeFD() == read[i])
+            // Set only keys with the needed interest ops into the arrays.
+            int[] read = getFDsAsArray (SelectionKey.OP_READ
+                                        | SelectionKey.OP_ACCEPT);
+            int[] write = getFDsAsArray (SelectionKey.OP_WRITE
+                                         | SelectionKey.OP_CONNECT);
+
+            // FIXME: We dont need to check this yet
+            int[] except = new int [0];
+
+            // Test to see if we've got an unhandled wakeup call,
+            // in which case we return immediately. Otherwise,
+            // remember our current thread and jump into the select.
+            // The monitor for dummy object selectThreadMutex regulates
+            // access to these fields.
+
+            // FIXME: Not sure from the spec at what point we should
+            // return "immediately". Is it here or immediately upon
+            // entry to this function?
+            
+            // NOTE: There's a possibility of another thread calling
+            // wakeup() immediately after our thread releases
+            // selectThreadMutex's monitor here, in which case we'll
+            // do the select anyway. Since calls to wakeup() and select()
+            // among different threads happen in non-deterministic order,
+            // I don't think this is an issue.
+            synchronized (selectThreadMutex)
               {
-                if (key.channel () instanceof ServerSocketChannelImpl)
+                if (unhandledWakeup)
                   {
-                    ops = ops | SelectionKey.OP_ACCEPT;
+                    unhandledWakeup = false;
+                    return 0;
                   }
                 else
                   {
-                    ops = ops | SelectionKey.OP_READ;
+                    selectThread = Thread.currentThread ();
                   }
               }
-          }
 
-        // Set new ready write ops
-        for (int i = 0; i < write.length; i++)
-          {
-            if (key.getNativeFD() == write[i])
+            // Call the native select() on all file descriptors.
+            int result = 0;
+            try
               {
-                ops = ops | SelectionKey.OP_WRITE;
-                
-//                 if (key.channel ().isConnected ())
-//                   {
-//                     ops = ops | SelectionKey.OP_WRITE;
-//                   }
-//                 else
-//                   {
-//                     ops = ops | SelectionKey.OP_CONNECT;
-//                   }
-             }
-          }
+                begin();
+                result = implSelect (read, write, except, timeout);
+              }
+            finally
+              {
+                end();
+              }
 
-        // FIXME: We dont handle exceptional file descriptors yet.
+            // If our unhandled wakeup flag is set at this point,
+            // reset our thread's interrupt flag because we were
+            // awakened by wakeup() instead of an external thread
+            // interruption.
+            //
+            // NOTE: If we were blocked in a select() and one thread
+            // called Thread.interrupt() on the blocked thread followed
+            // by another thread calling Selector.wakeup(), then race
+            // conditions could make it so that the thread's interrupt
+            // flag is reset even though the Thread.interrupt() call
+            // "was there first". I don't think we need to care about
+            // this scenario.
+            synchronized (selectThreadMutex)
+              {
+                if (unhandledWakeup)
+                  {
+                    unhandledWakeup = false;
+                    selectThread.interrupted ();
+                  }
+                selectThread = null;
+              }
 
-        // If key is not yet selected add it.
-        if (!selected.contains (key))
-          {
-            selected.add (key);
-          }
+            Iterator it = keys.iterator ();
 
-        // Set new ready ops
-        key.readyOps (key.interestOps () & ops);
-      }
+            while (it.hasNext ())
+              {
+                int ops = 0;
+                SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
 
-    deregisterCancelledKeys();
-    return result;
+                // If key is already selected retrieve old ready ops.
+                if (selected.contains (key))
+                  {
+                    ops = key.readyOps ();
+                  }
+
+                // Set new ready read/accept ops
+                for (int i = 0; i < read.length; i++)
+                  {
+                    if (key.getNativeFD() == read[i])
+                      {
+                        if (key.channel () instanceof ServerSocketChannelImpl)
+                          {
+                            ops = ops | SelectionKey.OP_ACCEPT;
+                          }
+                        else
+                          {
+                            ops = ops | SelectionKey.OP_READ;
+                          }
+                      }
+                  }
+
+                // Set new ready write ops
+                for (int i = 0; i < write.length; i++)
+                  {
+                    if (key.getNativeFD() == write[i])
+                      {
+                        ops = ops | SelectionKey.OP_WRITE;
+
+        //                 if (key.channel ().isConnected ())
+        //                   {
+        //                     ops = ops | SelectionKey.OP_WRITE;
+        //                   }
+        //                 else
+        //                   {
+        //                     ops = ops | SelectionKey.OP_CONNECT;
+        //                   }
+                     }
+                  }
+
+                // FIXME: We dont handle exceptional file descriptors yet.
+
+                // If key is not yet selected add it.
+                if (!selected.contains (key))
+                  {
+                    selected.add (key);
+                  }
+
+                // Set new ready ops
+                key.readyOps (key.interestOps () & ops);
+              }
+            deregisterCancelledKeys();
+            
+            return result;
+          }
+        }
   }
     
   public final Set selectedKeys()
   {
+    if (!isOpen())
+      throw new ClosedSelectorException();
+
     return selected;
   }
@@ -239,16 +342,38 @@
   public final Selector wakeup()
   {
-    return null;
+    // IMPLEMENTATION NOTE: Whereas the specification says that
+    // thread interruption should trigger a call to wakeup, we
+    // do the reverse under the covers: wakeup triggers a thread
+    // interrupt followed by a subsequent reset of the thread's
+    // interrupt status within select().
+    
+    // First, acquire the monitor of the object regulating
+    // access to our selectThread and unhandledWakeup fields.
+    synchronized (selectThreadMutex)
+      {
+        unhandledWakeup = true;
+        
+        // Interrupt any thread which is currently blocked in
+        // a select operation.
+        if (selectThread != null)
+          selectThread.interrupt ();
+      }
+      
+    return this;
   }
 
   private final void deregisterCancelledKeys()
   {
-    Iterator it = cancelledKeys().iterator();
-
-    while (it.hasNext ())
-      {
-        keys.remove ((SelectionKeyImpl) it.next ());
-        it.remove ();
-      }
+    Set ckeys = cancelledKeys ();
+    synchronized (ckeys)
+    {
+      Iterator it = ckeys.iterator();
+
+      while (it.hasNext ())
+        {
+          keys.remove ((SelectionKeyImpl) it.next ());
+          it.remove ();
+        }
+    }
   }
 
@@ -283,5 +408,9 @@
       }
 
-    keys.add (result);
+    synchronized (keys)
+      {
+        keys.add (result);
+      }
+
     result.interestOps (ops);
     result.attach (att);
Index: java/nio/channels/spi/AbstractSelector.java
===================================================================
RCS file: /cvs/gcc/gcc/libjava/java/nio/channels/spi/AbstractSelector.java,v
retrieving revision 1.4
diff -u -2 -r1.4 AbstractSelector.java
--- java/nio/channels/spi/AbstractSelector.java	9 Oct 2003 17:34:10 -0000	1.4
+++ java/nio/channels/spi/AbstractSelector.java	20 Dec 2003 00:38:35 -0000
@@ -40,4 +40,5 @@
 
 import java.io.IOException;
+import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -65,5 +66,5 @@
    * @exception IOException If an error occurs
    */
-  public final void close () throws IOException
+  public final synchronized void close () throws IOException
   {
     if (closed)
@@ -103,4 +104,7 @@
   protected final Set cancelledKeys()
   {
+    if (!isOpen())
+      throw new ClosedSelectorException();
+
     return cancelledKeys;
   }
@@ -108,5 +112,8 @@
   final void cancelKey (AbstractSelectionKey key)
   {
-    cancelledKeys.remove (key);
+    synchronized (cancelledKeys)
+      {
+        cancelledKeys.remove(key);
+      }
   }
 




Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]