HƯỚNG DẪN Working in a Multiple Threading Environment with ExecutorService pool

Joe

Thành viên VIP
21/1/13
2,925
1,312
113
Hi

Every developer will sooner or later confront with a problem that how a common object can be processed consistently in a multiple threading environment. Or in short: How to arrange multiple threads to work with one or more common object/objects without messing the results. You may say "Oh! That's easy and simple: "I use Synchronization!"

Well, true...but a bit too simple. In many cases the synchronized method or block works, but in complicated environment such as updating in a synchronized method that invokes other methods which could belong to other objects and that could be very tricky.

Huh? Where is the trickiness? The cardinal problem between synchronized block and locked block is that synchronized block or method could cause "starvation" to some other threads. Meaning: if a thread blocks an object with "synchronized" and is somehow run into troubles during the processing phase other threads could wait to death because there is no way to ask a synchronized block or method whether it is in progress or hangs...and your app could "hang" forever with it.

Java offers developers various locking mechanisms which are in the package java.util.concurrent.locks (click HERE for details) and with it developers could get rid of the dangerous "App-Hanging". The second thing is the ExcecutorService pool. Thread and Runnable Implementation are un-reusable and usually greedy for resources. That could cause problems with memory shortage or performant sluggishness. To overcome such problems the basic package java.util.concurrent (click HERE) gives the developers the tools to reuse some tasks (Callable or Runnable) and to share some resources. And that effects tremendously the processing performance.

Let start with the synchronized block or method. If the entire method that contains the objects is synchronized it is the synchronized method. If only the object is synchronized it's the synchronized block. A block is usually an object or a group of several statements. If an object is a primitive (such as int, short, double, etc.) the "
this" must be used instead of the variable name.
Java:
int count = 0;
// synvhronized method
public synchronized void counter() {
  count++;
}
// synchronized block
public void counter() {
   synchronized(this) { // this is used because count is an int or a primitive
       count++
   }
}
//
ArrayList<String> lst = new ArrayList<>();
public void add(String s) {
   synchronized(lst) {   // lst is an object
      lst.add(s);
   }
}
The question is: There are several types of ExecutorService pools and what pool I should use? The answer is that it depends on the tasks you are working with. Short-live threads (Callable or Runnable) are the best with newCachedThreadPool, relative long-live threads are with newFixedThreadPool, etc. However, computer is nowadays equipped by multiple cores processor and you can exploit it with newWorkStealingPool (parallel processing). The other pools are specific (e.g. scheduling, etc.). The following screenshot shows you the difference between FixedThreadPool and WorkStealingPool
Java:
public class SynLock {
  public SynLock( ) { }
  public synchronized void counter() {
    count++;
  }
  public int getCounter() {
    return count;
  }
  public long process(int max, boolean fixed) {
    pool = fixed? Executors.newFixedThreadPool(100):
                  Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors());
    ArrayList<Future<?>> fu = new ArrayList<Future<?>>();
    long beg = System.currentTimeMillis();
    for (int i = 0; i < max; ++i) {
      fu.add(pool.submit(()->counter()));
    }
    try {
      for (Future f : fu) f.get();
    } catch (Exception ex) { }
    beg = System.currentTimeMillis()-beg;
    pool.shutdownNow();
    return beg;
  }
  private ExecutorService pool;
  private int count = 0;
}
image_2021-01-30_134010.png

synLock.png

See the differences? And that is the way how the "Competitive Programming" works: same task, different outcomes.
 
Sửa lần cuối:

Joe

Thành viên VIP
21/1/13
2,925
1,312
113
So, where are the differences (or problems) of "synchronized" beside the Hang-probability? The following table highlights the differences between "synchronized" and the "locking mechanism":
Code:
+----------------------------+-----------------------+--------------------+
| Features                   |   synchronized        | locking Mechanism  |
+----------------------------+-----------------------+--------------------+
| Acquire a lock             | keyword: synchronized | explicit lock()    |
| Release lock               | implicit release      | explicit unlock()  |
| Fairness                   | unpredictable         | fair (e.g. FIFO)   |
| Interruptability           | impossible            | yes                |
| Lock/Unlock order          | lock-unlock @ the time| in any order       |
|                            | of lock acquisition   |                    |
| Across methods             | impossible            | possible           |
| Exception control          | lock is released      | can be caught and  |
|                            |                       | lock can be undone |
|                            |                       | (unlock) or kept   |
+----------------------------+-----------------------+--------------------+
The most important features are the Across Methods and the Exception Control. Thes features are very useful in working with Database where lock() and unlock() can be spread over and across several "method invocations".

There're many ways to keep the consistency of objects in a multiple threading environment. The first and the easiest way is the "synchronized" technique. It's fast, but bears some disadvantages. Let talk about the most common Locking techniques:
  1. ReentrantLock
  2. ReentrantReadWriteLock
  3. Atomic lock (e.g. AtomicInteger, AtomicLong, etc.)
  4. ObjectLock using the wait() and notify() method of class Object
  5. StampedLock
  6. StampedLock with tryOptimisticRead()
The ReentrantLock is the the most basic way to lock an object for processing and finally to unlock it. What is "Reentrant" anyway? In Computer Science Reentrancy (from re-enter) indicates the codes that can be reloaded and continued after interruption (more HERE).
Java:
// Joe Nartca (C)
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
//
public class ApiLock {
  public ApiLock( ) { }
  public void counter() {
    try {
      myLock.lock();
      count++;
    } finally {
      myLock.unlock();
    }
  }
  public int getCounter() {
    return count;
  }
  public long process(int max, boolean fixed) {
    pool = fixed? Executors.newFixedThreadPool(100):
                  Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors());
    ArrayList<Future<?>> fu = new ArrayList<Future<?>>();
    long beg = System.currentTimeMillis();
    for (int i = 0; i < max; ++i) {
      fu.add(pool.submit(()->counter()));
    }
    try {
      for (Future f : fu) f.get();
    } catch (Exception ex) { }
    beg = System.currentTimeMillis()-beg;
    pool.shutdownNow();
    return beg;
  }
  private int count = 0;
  private ExecutorService pool;
  private Lock myLock = new ReentrantLock();
}
The JAVA statement
Java:
try{
  ...              // could cause problems, e.g. Exception
} finally {
  myLock.unlock(); // always back to here for an UNLOCK !
}
makes sure that the unlock() is ALWAYS executed. The danger of starvation or inconsistancy can be eliminated in this way.
reentrant.png
 
Sửa lần cuối:

Joe

Thành viên VIP
21/1/13
2,925
1,312
113
What should we understand about the term "across methods", "exception control" and "Lock/Unlock order"?
  • Across Methods: a lock can be made in method A and released (unlocked) in method B. With "synchronized" the lock is released when the method or block terminates.
  • Exception Control: with the try-catch-finally a lock (when it is unlocked elsewhere) can be freed if exception is thrown. With "synchronized" the lock is always freed when exception happens and the method or block is forced to terminate.
  • Lock/Unlock order: as said in "across methods" the lock and unlock can be distributed in different methods and each method can be invoked arbitrarily.
I've showed you the ReentrantLock and "synchronized" lock. As you see both share two similar methods:
  • getCounter()
  • process()
the prospect of other mentioned locking mechanisms is similar. So, we "design-pattern" them with a generic abstract class called ALLLock that covers the two mentioned methods and the relevant variables. Only the method counter() is needed to implement.
Java:
// Joe Nartca (C)
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
//
public abstract class ALLLock {
  public ALLLock( ) { }
  public abstract void counter();
  public int getCounter() {
    return count;
  }
  public long process(int max, boolean fixed) {
    pool = fixed? Executors.newFixedThreadPool(100):
                  Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors());
    ArrayList<Future<?>> fu = new ArrayList<Future<?>>();
    long beg = System.currentTimeMillis();
    for (int i = 0; i < max; ++i) {
      fu.add(pool.submit(()->counter()));
    }
    try {
      for (Future f : fu) f.get();
    } catch (Exception ex) { }
    beg = System.currentTimeMillis()-beg;
    pool.shutdownNow();
    return beg;
  }
  private ExecutorService pool;
  protected int count = 0;
}
so we go:

SynLock
Java:
public class SynLock extends ALLLock {
  public SynLock( ) {
    super();
  }
  public synchronized void counter() {
    count++;
  }
}
ApiLock (with ReentrantLock)
Java:
public class ApiLock extends ALLLock {
  public ApiLock( ) {
    super();
  }
  public void counter() {
    try {
      myLock.lock();
      count++;
    } finally {
      myLock.unlock();
    }
  }
  private Lock myLock = new ReentrantLock();
}
ObjectLock with wait() and notify(). This Locking mechanism exploits the two methods wait() and notify() of its originator Object which is the implemented MyLock.java.
Java:
import joe.MyLock;  // Object Lock

public class MeLock extends ALLLock {
  public MeLock( ) {
    super();
  }
  public void counter() {
    try {
      myLock.lock();
      count++;
    } catch (Exception ex) { }
    myLock.unlock();
  }
  private MyLock myLock = new MyLock();
}
and the MyLock.java
Java:
package joe;
public class MyLock{
  private boolean locked = false;
  public synchronized void lock() throws Exception {
    while(locked) {
      wait();
    }
    locked = true;
  }
  public synchronized void unlock(){
    locked = false;
    notify();
  }
}
The Atomic Lock (here AtomicInteger). In many cases primitives such as int, long, etc. are shared by threads it's easier to work with the atomic locking mechanism than the cumbersome "synchronized". In our case count is an int hence we use the AtomicInteger API.
Java:
public class AtomLock extends ALLLock {
  public AtomLock( ) {
    super();
  }
  public void counter() {
    count.incrementAndGet();
  }
  // overwrite
  public int getCounter() {
    return count.get();
  }
  private AtomicInteger count = new AtomicInteger(0); // initialized 0
ReentrantReadWriteLock is similar to ReentrantLock but it allows the users to select the working algoritnm: fair or contention (unfair). The fairness (true) makes sure that all competitive threads have the fair chance to work with the shared object.
Java:
public class RWLock extends ALLLock {
  public RWLock(boolean fair) {
    super();
    myLock = new ReentrantReadWriteLock(fair);
    wLock =  myLock.writeLock();
    //rLock =  myLock.readLock();
  }
  public void counter() {
    try {
      wLock.lock();
      count++;
    } finally {
      wLock.unlock();
    }
  }
  //private Lock rLock;
  private Lock wLock;
  private ReadWriteLock myLock;
}
StampedLock uses a "stamp" (hence stamped) as a ticket to make a NON-exclusive lock on object. If the stamp is 0 the lock is failed. In order to make sure that object has to be locked before it is modified a "tryConvertToWriteLock(stamp)" gives the green or red light to go ahead. This way is called "Optimistic" read.
Java:
public class StampLock extends ALLLock {
  public StampLock( ) {
    super();
  }
  public void counter() {
    long lck = 0;
    try {
     lck = myLock.writeLock();
     count++;
    } finally {
      myLock.unlock(lck);
    }
  }
  private StampedLock myLock = new StampedLock();
}
StampedLock with OptimicRead
Java:
public class StampOptLock extends ALLLock {
  public StampOptLock( ) {
    super();
  }
  public void counter() {
    long lck = 0, stamp = 0;
    try {
      stamp = myLock.readLock(); // to lock count
      lck = myLock.tryConvertToWriteLock(stamp); // convert to writeLock
      if (lck == 0) {
        myLock.unlockRead(stamp);
        stamp = myLock.writeLock();  // must lock
      } else stamp = lck;
      count++;
    } finally {
      myLock.unlock(stamp);
    }
  }
  private StampedLock myLock = new StampedLock();
}
And the frame for the Test:
Java:
import java.io.*;
import java.util.*;
import javax.swing.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
// Joe Nartca (C)
public class PerformanceTest extends JFrame {
  public PerformanceTest(String[] a) {
    count = a.length > 0?Integer.parseInt(a[0]):10000;
    JButton b0 = new JButton("SynLock");
    b0.addActionListener(e->allLock(new locks.SynLock(), "SynLock"));
    JButton b1 = new JButton("ApiLock");
    b1.addActionListener(e->allLock(new locks.ApiLock(), "APILock"));
    JButton b2 = new JButton("AtomicLock");
    b2.addActionListener(e->allLock(new locks.AtomLock(), "AtomicLock"));
    JButton b3 = new JButton("MyLock");
    b3.addActionListener(e->allLock(new locks.MeLock(), "MyLock"));
    JButton b4 = new JButton("RWLock");
    b4.addActionListener(e->allLock(new locks.RWLock(fair), "RWLock"));
    JButton b5 = new JButton("StampedLock");
    b5.addActionListener(e->allLock(new locks.StampLock(), "StampedLock"));
    JButton b6 = new JButton("StampedOptLock");
    b6.addActionListener(e->allLock(new locks.StampOptLock(), "StampOptLock"));
    JPanel pS = new JPanel();
    pS.add(b0); pS.add(b1); pS.add(b2); pS.add(b3); pS.add(b4); pS.add(b5); pS.add(b6);
  
    jta = new JTextArea(12, 60);
    jta.setEditable(false);
    JScrollPane jsp = new JScrollPane(jta);
    jsp.setAutoscrolls(true);
  
    JCheckBox cb1 = new JCheckBox("fixedPool", true);
    cb1.addItemListener(e -> {
      if (fixed) {
        fixed = false;
        cb1.setText("StealingPool");
      } else {
        fixed = true;
        cb1.setText("fixedPool");
      }
    });
    JCheckBox cb2 = new JCheckBox("Fairness", true);
    cb2.addItemListener(e -> {
      fair = !fair;
    });
    JPanel pN = new JPanel();
    pN.add(cb1); pN.add(cb2);
    add("North", pN);
    add("Center", jsp);
    add("South", pS);
    setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
    pack();
    setVisible(true);
  }
  private void allLock(locks.ALLLock all, String msg) {
    long t = all.process(count, fixed);
    jta.append(msg+". Elapsed time:"+t+", Counter:"+all.getCounter()+
               (fixed?" (fixed":"(stealing")+"Pool)"+
               (all instanceof locks.RWLock? ", Fairness:"+fair:"")+"\n");
  }
  private int count;
  private JTextArea jta;
  private boolean fixed = true, fair = true;
  public static void main(String... a) {
    new PerformanceTest(a);
  }
}
allLock.png
 
Sửa lần cuối: