Reuse CompletableFuture?

Discussion in 'Java Update' started by pinokio, 4/1/16.

  1. pinokio

    pinokio New Member

    Hi all,
    I am currently working in Java client library that sends a lot of asynchronous requests to remote server, then return a Future-like object to user side (the future is set completed whenever client receives response from server). In my initial implementation, the send function looks like this:
    Code:
    private CompletableFuture send(Request request) {
        CompletableFuture future = new CompletableFuture<>();
        request.setFuture(future);
        getSession().send(request); // Issue an async IO here
        return request.getFuture();
    }
    Here, a new CompletableFuture is allocated which burdens Java GC a lot if send() is called frequently.
    Is there any better option to utilize CompletableFuture? Like CompletableFuture pool?
    Does anybody experience the difference between CompletableFuture and equivalents like Future of Netty/Akka/Finagle?
    Thank you.
  2. Joe

    Joe Moderator

    If I understand you correctly your approach is UN-Kalashnikov:
    Communication between 2 different partners should be made as simple as possible. Invent your own technique to deal with your problem.
    CompletableFuture is NEW and EVENT-driven. And that means, for example, it'd be a CancelAction, a Termination of a DownloadAction, etc. All that usually happens within a reachable environment (i.e. local).
    ----
    re-edited
    An example for you: (based on my Challenge to you and the others)
    PHP:
    import java.util.*;
    import java.util.concurrent.*;

    import javax.swing.JOptionPane;
    import javax.swing.UIManager;
    // Joe Nartca - CongdongJava.com
    public class QA1 {
        public 
    QA1() {
            
    pool = new ForkJoinPool(10); // 10 for multiple queues
            
    asso ass.toArray(new AE[ass.size()]);

        }
        
    AE[] asso;
        
    ForkJoinPool pool;
        public 
    AE find(String key) {
                final 
    byte[] key.getBytes();  
                
    CompletableFuture<AEfc CompletableFuture.supplyAsync(()->{
                     
    OUT:for (int x 0;asso.length; ++x) {
                       if (
    asso[x].keyLength == k.length && asso[x].key[0] == k[0]) {
                           for (
    int l=1;l<k.length; ++l) if (asso[x].key[l] != k[l]) continue OUT;
                           return 
    asso[x];
                       }
                     }
                     return 
    null;
                }, 
    pool);
                
    // wait for termination
                
    try {
                    return 
    fc.get();
                 } catch (
    Exception ex) { }
                 return 
    null;
         }
         private static class 
    AE {
                 public 
    byte[] key;
                 public 
    short keyLength// same as key.length
                 
    public int objLength;
         }
        static 
    ArrayList<AEass = new ArrayList<AE>();
        public static 
    void main(String[] argvthrows Exception {
            
    UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName());
            
    String inp JOptionPane.showInputDialog("key0,key1,...,key99999");
            if (
    inp == null) {
                
    JOptionPane.showMessageDialog(null"ERROR. Usage: java QuickAccess keyX,keyY,keyZ,...""Usage"JOptionPane.INFORMATION_MESSAGE);
                return;
            }

            
    String[] inp.split(",");
            
    int count a.length;
            for (
    int i 0count; ++i)
                
    a[i] = a[i].trim().toLowerCase();
            for (
    int i 0100000; ++i) { // generate 100K keys
                
    AE ae = new AE();
                
    ae.key = ("key" i).getBytes();
                
    ae.keyLength = (shortae.key.length;
                
    ae.objLength 5;
                
    ass.add(ae);
            }

            
    QA1 qa = new QA1();
            if (
    count 0) {
                
    AE[] ae = new AE[count];
                
    long beg System.nanoTime();
                
    // >>> invoking the find-method <<<
                
    for (int i 0count; ++i)
                    
    ae[i] = qa.find(a[i]);

                for (
    int i 0count; ++i) {
                    if (
    ae[i] == null)
                        
    System.out.println("Key for a[" "] is NOT found.");
                    else
                        
    System.out.println("Key for a[" "]=" + new String(ae[i].key));
                }

                
    System.out.println("t=" + (System.nanoTime() - beg) + " nSec. for " count " Search&Access" + (count "es" ""));
            } else
                
    JOptionPane.showMessageDialog(null"ERROR. Usage: java QuickAccess keyX,keyY,keyZ,...""Usage"JOptionPane.INFORMATION_MESSAGE);
        }

    }
    Pinokio. Thank you for the hint of CompletableFuture. I've really forgotten it. The codes here are almost as fast as HashMap of yours:D (averaged: 12 mSec)
  3. pinokio

    pinokio New Member

    If the approach is UN-Kalashnikov, so how we can make it more Kalashnikov-friendly?
    The problem is simple: User send a lot (hundreds) of requests simultaneously using SocketChannel (non-blocking mode), and define the callback when the response associated to this request arrives.
    For eg: Send (ASK_NAME, "Who are you?"), and when received "Joe", print "Hello Joe".
    Send (ASK_AGE, "How old are you?") and when received "18", print "18 year old".
    But what if two send commands are sent asynchronously, then the second response "18" arrived before "Joe"? We must implement the callback for "Hello X" for the ASK_NAME request and "Y year old" for the ASK_AGE. These callbacks are implemented by using kind of CompletabeFuture. I cannot figure out the better way to achieve the simplicity.
  4. Joe

    Joe Moderator

    Correction:
    PHP:
    pool ForkJoinPool.common(); // much better. Reason: avoiding Shutdown
    Then think about the Chat-Implementation...
  5. pinokio

    pinokio New Member

    ForkJoinPool?? Hmm, I am sorry that I don't know how to to get a CompletableFuture from a ForkJoinPool. In my implementation, I would like to return a CompletableFuture for each request sent, so that user can call thenXXX with any handler he want to do with the response corresponding to this request. In addition, how we can release this CompletableFuture to ForkJoinPool once user finishes using it?

    It is not as simple as chatting. For chatting, just show up every response received, regardless its type. The handler behavior is determined. But I am writing something like library or API, that does not know what user want to do with the response. CompletableFuture lets user do whatever he wants to do using thenXXX function.
    How about this scenario: There are two type of request to OODB: INIT and COUNT. My client library to OODB server must serve two corresponding functions.
    Now user want
    Code:
    1. Send INIT -> then (()->doFirst()); // Send the first INIT then call doFirst whenever receiving response for first INIT request.
    2.Send INIT -> then (()->doSecond()); // Send the second INIT then call doSecond whenever receiving response for second INIT request.
    2.Send COUNT -> then (()->doCount()); // Send the first COUNT then call doCount whenever receiving response for first COUNT request.
    
    Then, the server return response data in corresponding order of requests: INIT(for 2nd request), INIT(for 1st request), COUNT. Hence, the order of handler execution is doSecond(), doFirst(), doCount(). Remember as API developer, we do not know the exact handler in advance. All we know is there is a handler associated with each request sent, and user must define this handler for each request sent.
    Clearly chatting implementation is not enough, I think.
    I hope I can explain clearly enough. Sorry for my bad writing skill.
  6. Nancru

    Nancru CongDongJava Project Leader Staff Member

    So, pinokio wants such thing that when a user send request to server -> then server return a response, what user can do whatever with it, until he closes or terminates the connection(response) ->then server will pay the CompleleableFuture to the pool once user closes or terminates it.

    If it's the case, I think you are thinking too much, pinokio. OODB is simple, server returns data with their serialized objects. They will execute on their own computer or mobile or whatever it is. The moment connection is closed when server transfer all requested data to their client.

    Give them permission to continuing play with the connections is dangerous and complex.
    Joe likes this.
  7. Joe

    Joe Moderator

    Mein Freund Nancru,
    Ich liebe Deine Argumentation...;)

    Mon ami Pinokio,
    Kalashnikov said "Simple thing is useful, complex thing is usually unnecessary"
    PHP:
    ForkJoinPool pool =  ForkJoinPool.common();
    // doFirst() returns a String
    CompletableFuture<Stringc1 =  CompletableFuture.supplyAsync(()->{ ...your doFirst() code...}, pool);
    try {
       
    System.out.println("Got a Return:"c1.get());
    } catch (
    Exception ex) { e.printStackTrace() }
    ...
  8. pinokio

    pinokio New Member

    Thank Joe and Nancru. What I mean about the request and response is about the data to be sent/received, not the connection. There are many requests sent in one connection (one channel). The time and order of response data paired with each request is unpredictable, hence CF comes into play. Btw, Thank Joe for the hint of supplyAsync, how neglect I am about not considering it.

Chia sẻ trang này