HƯỚNG DẪN Advanced Parallel Programming in JAVA

Joe

Thành viên VIP
21/1/13
2,969
1,311
113
Hi

This article is an extension of my Posting Parallel Programming and this time I am going a bit into details with JAVA and JAVA specific features: ForkJoinPool and ForkJoinTask.

First of all: the following images give you some words about Parallelism and MuliTasking/MultiThreading:

parallel.png

and about ForkJoinTask

forkJoinTask.png

The word "fork" comes from an "eating" tableware
. To fork is to separate something into two. ONE into TWO like a twin. And that signifies that the new TWO are identical. An exact copy of each other.

ADVANCED PARALLEL PROGRAMMING

JAVA offers you two ways to work with parallelism on the high level of an OOPL: Pool, Task and Thread with Concurrency. Nevertheless you must be aware about the process you are working with: parallelism functions only and only if the sub-processes are fully independent from each other. Otherwise the sub-processes work just like threads or tasks which have to be waited for some dependencies (waiting due to synchronization) and the parallelism becomes partial sequentialism. A real waste of "multicores" CPU. To show you the significance of Process-Independence let work with a SEARCH for a STRING in any or more file(s) in a big directory with sub-directories.

directory.png

And the search sub-processes are fully independent from the parents

directory_1.png

I show you two different versions of Parallelism:
  1. as a ForkJoinTask (i.e. RecursiveTask): Seach.java
  2. as a Callable using Lambda Expression: SearchLambda.java

Search.java
PHP:
import java.io.*;
import java.util.*;
import java.nio.file.Files;
import java.util.concurrent.*;
// Joe Nartca
public class Search {
  public static void main(String... argv) throws Exception {
    if (argv.length != 2) {
      System.out.println("usage: java Search DirName SearchPattern");
      System.exit(0);
    }
    List<String> list = Collections.synchronizedList(new ArrayList<String>());
    long beg = System.nanoTime(); // starting the clock
    ForkJoinPool.commonPool().submit(new DirectorySearch(argv[0], argv[1], list)).join();
    beg = System.nanoTime() - beg; // stop the clock
    System.out.println("Search \""+argv[1]+"\" in Directory:"+argv[0]+
                       "\nElapsed time (microSec.):"+((double)beg/1000)+
                       "\nTotal files:"+countFiles(argv[0])+"\n");
    list.forEach(fName -> System.out.println("- "+fName));
  }
  //
  private static long countFiles(String dir) {
    long n = 0;
    try {
      File[] allFiles = new File(dir).listFiles();
      for (File f:allFiles) if (f.isFile()) ++n;
      else n += countFiles(dir+File.separator+f.getName());
      return n;
    } catch (Exception ex) { }
    return -1;
  }
  // Recursive task for the thieves: WorkStealing with ForkJoinPool
  private static class DirectorySearch extends RecursiveTask<List<String>> {
    public DirectorySearch(String dir, String pattern, List<String> list) {
      this.pattern = pattern;
      this.list = list;
      this.dir = dir;
    }
    private List<String> list;
    private String dir, pattern;
    protected List<String> compute() {
      try {
        File[] allFiles = new File(dir).listFiles();
        List<DirectorySearch> tasks = new ArrayList<>();
        for (File f : allFiles) {
           String name = f.getName();
           if (f.isFile()) {
             if ((new String(Files.readAllBytes(f.toPath()))).indexOf(pattern) >= 0 ||
                 name.indexOf(pattern) >= 0) list.add(name);
           } else { // recursive Directory MRA...
             DirectorySearch ds = new DirectorySearch(dir+File.separator+name, pattern, list);
             ds.fork();
             tasks.add(ds);
           }
        }
        for (DirectorySearch ds : tasks) ds.join();
        return list;
       } catch (Exception e) {
        e.printStackTrace();
       }
       return null;
    }
  }
}
and the SearchLambda.java
PHP:
public class SearchLambda {
  public static void main(String... argv) throws Exception {
    if (argv.length != 2) {
      System.out.println("usage: java Search DirName SearchPattern");
      System.exit(0);
    }
    List<String> list = Collections.synchronizedList(new ArrayList<String>());
    long beg = System.nanoTime(); // starting the clock
    ForkJoinPool.commonPool().submit(()->{
      DirectorySearch(argv[0], argv[1], list);
      return null;
    }).join();
    beg = System.nanoTime() - beg; // stop the clock
    System.out.println("Search \""+argv[1]+"\" in Directory:"+argv[0]+
                       "\nElapsed time (microSec.):"+((double)beg/1000)+
                       "\nTotal files:"+countFiles(argv[0])+"\n");
    list.forEach(fName -> System.out.println("- "+fName));
  }
  //
  private static long countFiles(String dir) {
    long n = 0;
    try {
      File[] allFiles = new File(dir).listFiles();
      for (File f:allFiles) if (f.isFile()) ++n;
      else n += countFiles(dir+File.separator+f.getName());
      return n;
    } catch (Exception ex) { }
    return -1;
  }
  // Recursive task for the thieves: WorkStealing with ForkJoinPool
  private static void DirectorySearch(String dir, String pattern, List<String> list) {
    try {
      File[] allFiles = new File(dir).listFiles();
      List<ForkJoinTask<Void>> tasks = new ArrayList<>();
      for (File f : allFiles) {
         String name = f.getName();
         if (f.isFile()) {
           if ((new String(Files.readAllBytes(f.toPath()))).indexOf(pattern) >= 0 ||
               name.indexOf(pattern) >= 0)
           list.add(name);
         } else {
           tasks.add(ForkJoinPool.commonPool().submit(()->{
             DirectorySearch(dir+File.separator+name, pattern, list);
             return null;
           }));
         }
       }
       for (ForkJoinTask<Void> t : tasks) t.join();
     } catch (Exception e) {
      e.printStackTrace();
     }
  }
}
If you successfully compiled the 2 files you could get such following results:
Code:
C:\links\java\Sort>javac -g:none -d ./classes Search*.java

C:\links\java\Sort>java Search C:/links/java DirectorySearch
Search "DirectorySearch" in Directory:C:/links/java
Elapsed time (microSec.):418773.9
Total files:743

- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- Search.java
- SearchLambda.java

C:\links\java\Sort>java SearchLambda C:/links/java DirectorySearch
Search "DirectorySearch" in Directory:C:/links/java
Elapsed time (microSec.):576396.9
Total files:743

- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- Search.java
- SearchLambda.java

C:\links\java\Sort>java SearchLambda C:/links/java DirectorySearch
Search "DirectorySearch" in Directory:C:/links/java
Elapsed time (microSec.):579400.1
Total files:743

- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- Search.java
- SearchLambda.java

C:\links\java\Sort>java Search C:/links/java DirectorySearch
Search "DirectorySearch" in Directory:C:/links/java
Elapsed time (microSec.):412899.0
Total files:743

- Search.java
- SearchLambda.java
- Search$DirectorySearch.class
- Search.class
- SearchLambda.class

C:\links\java\Sort>
It's interesting that the Search with a "real" Task works faster than the Lambda-Search. The reason hides in the invisible works: The Callable must be created and then instantiated before it can be executed.

Conclusion: Don't think that all the newfangled coding ways are better than the classic ways. They looks neat, clean and short. But the works had to be carried behind your back. And that always takes time: We, American, used to say: "There ain't no such thing as a free lunch"
 
Sửa lần cuối:
  • Like
Reactions: quydtkt

Joe

Thành viên VIP
21/1/13
2,969
1,311
113
Note: To check for the completion of the running tasks inside the ForkJoinPool you have two choices:

1) Check for the active tasks/threads as following
PHP:
while (ForkJoinPool.commonPool().getActiveThreadCount() > 0); // do NOTHING while some tasks/threads are still running
2) wait for the completion of ALL running tasks/threads
PHP:
...
List<ForkJoinTask<Void>> tasks = new ArrayList<>();
...
tasks.add(ForkJoinPool.commonPool().submit(()->{
             DirectorySearch(dir+File.separator+name, pattern, list);
             return null;
           }));
...
for (ForkJoinTask<Void> t : tasks) t.join(); // wait for Completion
Both ways work perfectly. I myself prefer the waiting for ZERO active threads.
Joe
 
  • Like
Reactions: quydtkt

Joe

Thành viên VIP
21/1/13
2,969
1,311
113
(cont.)
As I mentioned about the "secret works" behind your back
The Callable must be created and then instantiated before it can be executed.
let do the works ourselves with the API ForkJoinTask<?> by an implementation as following: SearchTask,java. With this implementation I show you how an implementation of ForkJoinTask<?> returns a NON-Void Object which is a List<String>

SearchTask.java
PHP:
// Joe Nartca
public class SearchTask {
  public static void main(String... argv) throws Exception {
    if (argv.length != 2) {
      System.out.println("usage: java Search DirName SearchPattern");
      System.exit(0);
    }
    long beg = System.nanoTime(); // starting the clock
    ForkJoinTask<List<String>> fjt = ForkJoinPool.commonPool().
                                                  submit(new DirectorySearch(argv[0], argv[1]));
    beg = System.nanoTime() - beg; // stop the clock
    System.out.println("Search \""+argv[1]+"\" in Directory:"+argv[0]+
                       "\nElapsed time (microSec.):"+((double)beg/1000)+
                       "\nTotal files:"+countFiles(argv[0])+"\n");
    // retrieve the result and wait for its completion
    List<String> list = list = fjt.get();
    list.forEach(fName -> System.out.println("- "+fName));
  }
  //
  private static long countFiles(String dir) {
    long n = 0;
    try {
      File[] allFiles = new File(dir).listFiles();
      for (File f:allFiles) if (f.isFile()) ++n;
      else n += countFiles(dir+File.separator+f.getName());
      return n;
    } catch (Exception ex) { }
    return -1;
  }
  // Recursive task for the thieves: WorkStealing with ForkJoinPool
  private static class DirectorySearch extends ForkJoinTask<List<String>> {
    public DirectorySearch(String dir, String pattern) {
      this.pattern = pattern;
      this.dir = dir;
    }
    private String dir, pattern;
    private List<String> list;
    public boolean exec() {
      try {
        list = new ArrayList<>();
        File[] allFiles = new File(dir).listFiles();
        List<ForkJoinTask<List<String>>> tasks = new ArrayList<>();
        for (File f : allFiles) {
           String name = f.getName();
           if (f.isFile()) {
             if ((new String(Files.readAllBytes(f.toPath()))).indexOf(pattern) >= 0 ||
                 name.indexOf(pattern) >= 0)
             list.add(name);
           } else {
             tasks.add(ForkJoinPool.commonPool().submit(
               new DirectorySearch(dir+File.separator+name, pattern)));
           }
         }
         // gather the search results and wait if necessary
         for (ForkJoinTask<List<String>> t : tasks) {
           List<String> lst = t.get();
           for (String S:lst) list.add(S);
         }
       } catch (Exception e) {
        e.printStackTrace();
       }
       return true;
    }
    // Abstract Implementation: this is the returned value
    public List<String> getRawResult() {
      return list;
    }
    // Abstract Implementation: this is the set value
    protected void setRawResult(List<String> list) {
      this.list = list;
    }
  }
}
You see the mandatory implementation of 2 abstract methods getRawResult(), setRawResult() and how to build the returned object List<String>
PHP:
//  Expected returned value is List<String>
private static class DirectorySearch extends ForkJoinTask<List<String>> {
     ...
    private List<String> list; // <-- declared the returned object
    
    public boolean exec() {
      try {
        list = new ArrayList<>(); // <<-- instantiated HERE
        ...
        // gather the search results and wait if necessary
        for (ForkJoinTask<List<String>> t : tasks) {
           List<String> lst = t.get(); // get & wait
           for (String S:lst) list.add(S);
        }
      } catch (Exception ex) {
        ...
      }
     return true;
  }  
  // Abstract Implementation: this is the returned value
  public List<String> getRawResult() {
      return list;
  }
  // Abstract Implementation: this is the set value
  protected void setRawResult(List<String> list) {
      this.list = list;
   }
}
and the result (compared to the 2 previous implementations) will
stun you
Code:
C:\links\java\Sort>javac -g:none -d ./classes SearchTask.java

C:\links\java\Sort>java SearchTask c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):8899.1
Total files:746

- APP.txt
- Search.java
- SearchLambda.java
- SearchTask.java
- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- SearchTask$DirectorySearch.class
- SearchTask.class

C:\links\java\Sort>java Search c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):424023.1
Total files:746

- APP.txt
- Search.java
- SearchLambda.java
- SearchTask.java
- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- SearchTask$DirectorySearch.class
- SearchTask.class

C:\links\java\Sort>java SearchLambda c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):459161.5
Total files:746

- APP.txt
- Search$DirectorySearch.class
- Search.class
- Search.java
- SearchLambda.class
- SearchLambda.java
- SearchTask$DirectorySearch.class
- SearchTask.java
- SearchTask.class

C:\links\java\Sort>
8899.1 microSeconds versus 424023.1 (RecursiveTask) and 459161.5 (Lambda). BUT: if you modify the codes as following you could see the normalcy coming back: NO Stunning scene!

PHP:
    // start and wait for the result.
    List<String> list = ForkJoinPool.commonPool()
                                    .submit(new DirectorySearch(argv[0], argv[1]))
                                    .get();
Exercise for you: TRY to implement
Search.java with RecursiveTask<List<String>> and SearchLambda.java with the return of List<String> ;) The frame of Search.java and SearchLambda is already set. What you need to do is the modify the codes correctly...and the try makes the master :cool: Whoever you would become, Master or Apprentice, I will show you the solutions in the NEXT chapter
 
Sửa lần cuối:

Joe

Thành viên VIP
21/1/13
2,969
1,311
113
(cont.)
Have you solved the "exercise"? No matter if you haven't done it. As I have already said that a task always gives a feedback (or a return value). Even when there is no need for a feedback Object Void is used instead. To solve the exercise you need only to "adjust" the object Void to the returned object you want to have. In this case it's the List<String> containing the file names. And here are the solutions:

Search.java and we name the modification as SearchX.java
PHP:
  public static void main(String... argv) throws Exception {
    ...
    long beg = System.nanoTime(); // starting the clock
    ForkJoinTask<List<String>> fjt = ForkJoinPool.commonPool()
                                                 .submit(new DirectorySearch(argv[0], argv[1]));
    beg = System.nanoTime() - beg; // stop the clock
    ...
    // retrieve the result and wait for its completion
    List<String> list = fjt.get();
    list.forEach(fName -> System.out.println("- "+fName));
  }
  ...
  // parameter List<String> is turned to local variable as the returned object
  private static class DirectorySearch extends RecursiveTask<List<String>> {
    public DirectorySearch(String dir, String pattern) {
      this.pattern = pattern;
      this.dir = dir;
    }
    private List<String> list;      //<<---
    private String dir, pattern;
    protected List<String> compute() {
      try {
        list = new ArrayList<String>(); //<<-- instantiated HERE
        ...
        // wait for the completion of all forks
        for (DirectorySearch ds : tasks) {
          List<String> lst = ds.get();
          for (String S:lst) list.add(S);
        }
        return list; // <<-------return HERE
       } catch (Exception e) {
        e.printStackTrace();
       }
       return null; // <<-- null only in case of Exception
    }
  }
The same is done with SearchLambda.java to SearchLambdaX.java
PHP:
  public static void main(String... argv) throws Exception {
    ...
    long beg = System.nanoTime(); // starting the clock
    ForkJoinTask<List<String>> fjt = ForkJoinPool.commonPool().submit(()->{
      return DirectorySearch(argv[0], argv[1]); // <<--return the method
    });
    beg = System.nanoTime() - beg; // stop the clock
    ...
    // wait and fetch the result
    List<String> list = fjt.get();
    list.forEach(fName -> System.out.println("- "+fName));
  }
  ...
  // Return List<String>
  private static List<String> DirectorySearch(String dir, String pattern) {
    try {
      List<String> list = new ArrayList<>(); // <<--- instantiated HERE
      File[] allFiles = new File(dir).listFiles();
      List<ForkJoinTask<List<String>>> tasks = new ArrayList<>();
       ...
      // wait for the completion of all forks
      for (ForkJoinTask<List<String>> t : tasks) {
        List<String> lst = t.get();
        for (String S:lst) list.add(S);
      }
      return list; //<<--- return the List<String>
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null; //<<-- null if failed
  }
}
And here are the results which are much better than the previous versions (even 3 more files are included):
Code:
C:\links\java\Sort>java Search c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):437097.8
Total files:751

- APP.txt
- Search.java
- SearchLambda.java
- SearchLambdaX.java
- SearchTask.java
- SearchX.java
- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- SearchLambdaX.class
- SearchTask$DirectorySearch.class
- SearchTask.class
- SearchX$DirectorySearch.class
- SearchX.class

C:\links\java\Sort>java SearchX c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):9222.9
Total files:751

- APP.txt
- Search.java
- SearchLambda.java
- SearchLambdaX.java
- SearchTask.java
- SearchX.java
- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- SearchLambdaX.class
- SearchTask$DirectorySearch.class
- SearchTask.class
- SearchX$DirectorySearch.class
- SearchX.class

C:\links\java\Sort>java SearchLambda c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):505811.2
Total files:751

- APP.txt
- Search.java
- SearchLambda.java
- SearchLambdaX.java
- SearchTask.java
- SearchX.java
- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- SearchLambdaX.class
- SearchTask$DirectorySearch.class
- SearchTask.class
- SearchX$DirectorySearch.class
- SearchX.class

C:\links\java\Sort>java SearchLambdaX c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):30350.2
Total files:751

- APP.txt
- Search.java
- SearchLambda.java
- SearchLambdaX.java
- SearchTask.java
- SearchX.java
- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- SearchLambdaX.class
- SearchTask$DirectorySearch.class
- SearchTask.class
- SearchX$DirectorySearch.class
- SearchX.class

C:\links\java\Sort>java SearchTask c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):11988.6
Total files:751

- APP.txt
- Search.java
- SearchLambda.java
- SearchLambdaX.java
- SearchTask.java
- SearchX.java
- Search$DirectorySearch.class
- Search.class
- SearchLambda.class
- SearchLambdaX.class
- SearchTask$DirectorySearch.class
- SearchTask.class
- SearchX$DirectorySearch.class
- SearchX.class

C:\links\java\Sort>
As you see, the results depend on the way how the tasks are launched and how the results are expected. A direct wait of a result is usually a waste of time at the expense of Parallelism:
PHP:
// direct wait for a result
ForkJoinPool.commonPool().submit(new DirectorySearch(argv[0], argv[1], list)).join();
// wait only for the result when it is NEEDED
ForkJoinTask<List<String>> fjt = ForkJoinPool.commonPool() .submit(new DirectorySearch(argv[0], argv[1]));
...
List<String> list = fjt.get();
 

Joe

Thành viên VIP
21/1/13
2,969
1,311
113
An additional note:

Because Admin quydtkt has posted not long ago a series about the newfangled Programming Techniques of Java 8: the stream, filter, forEach, etc. I show you here how one could shorten the Content-Search using the new Technique:

SearchS.java
PHP:
public class SearchS {
  public static void main(String... argv) throws Exception {
    if (argv.length != 2) {
      System.out.println("usage: java Search DirName SearchPattern");
      System.exit(0);
    }
   // this is the new feature of JAVA 8 using Stream, find and filter
    long beg = System.nanoTime(); // starting the clock
    Stream<Path> s = Files.find(Paths.get(argv[0]), Integer.MAX_VALUE,
                (path, basicFileAttributes) -> {
                    File f = path.toFile();
                    String X = null;
                    try {
                      X = new String(Files.readAllBytes(path));
                    } catch (Exception ex) { }
                    return f.getName().indexOf(argv[1]) >= 0 || X != null && X.indexOf(argv[1]) >= 0;
                });
    beg = System.nanoTime() - beg; // stop the clock
    System.out.println("Search \""+argv[1]+"\" in Directory:"+argv[0]+
                       "\nElapsed time (microSec.):"+((double)beg/1000)+
                       "\nTotal files:"+countFiles(argv[0])+"\n");
    s.forEach(System.out::println);
  }
  private static long countFiles(String dir) {
    long n = 0;
    try {
      File[] allFiles = new File(dir).listFiles();
      for (File f:allFiles) if (f.isFile()) ++n;
      else n += countFiles(dir+File.separator+f.getName());
      return n;
    } catch (Exception ex) { }
    return -1;
  }
}
and let modify the SearchX.java to cope with the newfangled technique.
PHP:
           if (f.isFile()) {
             if ((new String(Files.readAllBytes(f.toPath()))).indexOf(pattern) >= 0 ||
               //name.indexOf(pattern) >= 0) list.add(name);
               name.indexOf(pattern) >= 0) list.add(f.getAbsolutePath()); // absolute path
           } else { // recursive Directory MRA...
              ...
           }
and let check the results
Code:
C:\links\java\Sort>javac -g:none -d ./classes SearchS.java

C:\links\java\Sort>javac -g:none -d ./classes SearchX.java

C:\links\java\Sort>java SearchS c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):44350.8
Total files:754

c:\links\java\Sort\APP.txt
c:\links\java\Sort\classes\Search$DirectorySearch.class
c:\links\java\Sort\classes\Search.class
c:\links\java\Sort\classes\SearchLambda.class
c:\links\java\Sort\classes\SearchLambdaX.class
c:\links\java\Sort\classes\SearchTask$DirectorySearch.class
c:\links\java\Sort\classes\SearchTask.class
c:\links\java\Sort\classes\SearchX$DirectorySearch.class
c:\links\java\Sort\classes\SearchX.class
c:\links\java\Sort\Search.java
c:\links\java\Sort\SearchLambda.java
c:\links\java\Sort\SearchLambdaX.java
c:\links\java\Sort\SearchTask.java
c:\links\java\Sort\SearchX.java

C:\links\java\Sort>java SearchX c:/links/java DirectorySearch
Search "DirectorySearch" in Directory:c:/links/java
Elapsed time (microSec.):9105.0
Total files:754

- c:\links\java\Sort\APP.txt
- c:\links\java\Sort\Search.java
- c:\links\java\Sort\SearchLambda.java
- c:\links\java\Sort\SearchLambdaX.java
- c:\links\java\Sort\SearchTask.java
- c:\links\java\Sort\SearchX.java
- c:\links\java\Sort\classes\Search$DirectorySearch.class
- c:\links\java\Sort\classes\Search.class
- c:\links\java\Sort\classes\SearchLambda.class
- c:\links\java\Sort\classes\SearchLambdaX.class
- c:\links\java\Sort\classes\SearchTask$DirectorySearch.class
- c:\links\java\Sort\classes\SearchTask.class
- c:\links\java\Sort\classes\SearchX$DirectorySearch.class
- c:\links\java\Sort\classes\SearchX.class

C:\links\java\Sort>
If you love "fashion" you are with the newfangled codes. But if your app needs performance you should go with the classic old-fashioned way.
 
Sửa lần cuối: