Queueing. Persistency - Stateful - Stateless

Joe

Thành viên VIP
21/1/13
3,074
1,344
113
(Continue of HERE)

Queueing Technique and Implementation - Persistency

You may notice in the previous session the Runnable ModelWorker. Yes, this module is responsible for the Queueing and the Persistency on the Subscriber's site. But first of all: where is the Queueing on ther Publisher's site? Before I go into details with the ModelWorker, let me show you the simple Queueing technique on the Publisher's site.

The Publisher, or in this case: the MessageSender, works with a Concurrent List msgLst as the queue for the messages which need to be published (sent)
Java:
...
  public synchronized void broadcast(byte[] msg) {
    msgLst.add(msg);
  }
  ...
  private volatile List<byte[]> msgLst = Collections.synchronizedList(new ArrayList<byte[]>());
Method broadcast(byte[] msg) queues the request at the end of the msgLst (FIFO - First In First Out) while the endless loop in the run() method removes the message on the top of the list till NOTHING remains in the msgLst.
Java:
  while (msgLst.size() > 0) {
    byte[] msg = msgLst.remove(0);
    mcs.send(new DatagramPacket(msg, msg.length, group, P.port)); // publish the msg
  }
This FIFO-Queueing technique works almost instantly. As long as a new message is queued in the queue msgLst the processing loop perceives the change instantly (hence the keyword VOLATILE) in the main memory (not in its Thread's cache). The synchronization work with the msgLst is reduced to the max thanks to the useful Collections.synchronizedList API of the JAVA Concurrency package. Due to the "instant reaction" there is no need for Persistency here. Neither Stateful, nor Stateless. Another unspoken reason is that Persistency depends greatly on the application design, not in the message itself. Normally, any changing (update, add, delete) of the data (e.g. data of a Database) must be fully completed before any message about the change can be published. The Persistency is also HERE, in the completion of the changes in the Database. If a message gets lost the data in the database are still valid for the others when they come later. The loss is also in such a case irrelevant.

On the Subscriber's site it is more complex than on the Publisher's site. The subscriber, in this case: MessageListener, always gets from "nowhere" messages whether it needs them or not it is irrelevant. The messages are simply queued. If the Subscriber starts to sort out the right messages it could take a lot of time to scan all the queued messages and causes a huge jam in the queue. The app becomes sluggish. The best way to eliminate all the unnecessary is to process the received message independently and as instantly as possible: no queueing if it is possible.
Java:
  while (loop) { // max. paket 8 KB
    DatagramPacket packet = new DatagramPacket(new byte[8192], 8192);
    listener.receive(packet); // wait for the incoming msg
    byte[] bb = new byte[packet.getLength()];
    System.arraycopy(packet.getData(), 0, bb, 0, bb.length);
    P.pool.submit(new ModelWorker(P, bb));
  }
In this case the Runnable implementation of ModelWorker makes more sense than Thread (see HERE) because the necessary "vehicle" for the execution of ModelWorker is the Pool Management (here: the ForkJoinPool). The above snippet shows you how the Message passing between the ModelWorker and the Receiver can be reduced in only to two steps: create a byte[] and copy the received message. The processing time is neglible. Btw, System.arraycopy() works superbly fast and better than a byte-to-byte loop. It's one of the JAVA native methods.

ModelWorker is the "sort-out work" that a subscriber normally has to do by itself when it receives a batch of messages. Because the data are ensured on the Publisher's site before a message is sent to the Subscribers there is no need to send the "changed data" to the Subscribers, but simply a small note saying about the change. With the NOTE ModelWorker is able to sort out what presentation (here: the displayed JTable) has to be "refreshed" according to and from the changes in the database. And that is the main purpose to avoid Persistency or Stateful or Stateless on the Subscriber's site.

The queue on the Subscriber's site is the ModelWorker. Each ModelWorker runs independently in the ForkJoinPool.CommonPool and processes the assigned tasks noted in the message. The NOTE format is defined as following:
Code:
+-----------+--------+-----+--------------+------+
| NetworkID | dbName | Key | JTable Index | mode |
+-----------+--------+-----+--------------+------+

NetworkID:    an unique ID given by the OODB Server
dbName:       the name of the OODB that has the changes
key:          the accessing key. If more keys they are concatenated and separated by a CarriageReturn (\r)
JTable Index: the index of the JTable that contains the OODB values
              0...5: actual in used indexes. 6...9: reserved for future expansion
              up 10...: combination of JTable Indexes (e.g. 10 for 0 and 1)
mode:         0: update, 1: add, 2: delete
The key is an assigned index to a serialized object. More: see this SECTION.

ModelWorker
Java:
// Joe Nartca (C)
// message format: netID!dbName!key!idx!mode
// idx: 0...5 index of Models/Tables. 6...9: reserved
//      10 for 0/1, 11 for 2/4, 12 for 3/4, 13 for 2/3/4
public class ModelWorker implements Runnable {
  public ModelWorker(Parms P, byte[] bb) {
    this.bb = bb;
    this.P = P;
  }
  public void run() {
    String[] msg = (new String(bb)).split("\t");
    dbName = msg[1];
    key    = msg[2];
    idx = Integer.parseInt(msg[3]);
    int mode = Integer.parseInt(msg[4]);
    try { // synchronized
      TimeUnit.MICROSECONDS.sleep(P.time);
    } catch (Exception et) { }
    if (mode == 1) {
      addRow();
      return;
    } else if (mode == 2) {
      deleteRow();
      return;
    } else if (mode > 0) {
      Utilities.errorLog(P, "**** ModelWorker. Unknown Mode: "+mode);
      return;
    }
    ...
  //
  private void update(int i1, int i2, int i3) {
    r3 = -1;
    r1 = P.models[i1].getRowAt(key);
    r2 = P.models[i2].getRowAt(key);
    idx = i1;
    if (r1 >= 0) P.models[i1].updateRow(getRow( ), r1);
    idx = i2;
    if (r2 >= 0) P.models[i2].updateRow(getRow( ), r2);
    if (i3 >= 0) {
      idx = i3;
      r3 = P.models[i3].getRowAt(key);
      if (r3 >= 0) P.models[i3].updateRow(getRow( ), r3);
    }
    SwingUtilities.invokeLater(() -> {
      if (r1 >= 0) P.models[i1].fireTableRowsUpdated(r1, r1);
      if (r2 >= 0) P.models[i2].fireTableRowsUpdated(r2, r2);
      if (r3 >= 0) P.models[i2].fireTableRowsUpdated(r3, r3);
    });
    return;
  }
  ...
  private void add(int i1, int i2, int i3) {
    r3 = -1;
    r1 = P.models[i1].getRowCount();
    r2 = P.models[i2].getRowCount();
    idx = i1;
    P.models[i1].addRow(getRow( ));
    idx = i2;
    P.models[i2].addRow(getRow( ));
    if (i3 >= 0) {
      r3 = P.models[i3].getRowCount();
      idx = i3;
      P.models[i3].addRow(getRow( ));
    }
    SwingUtilities.invokeLater(() -> {
      P.models[i1].fireTableRowsInserted(r1, r1);
      P.models[i2].fireTableRowsInserted(r2, r2);
      if (i3 >= 0)P.models[i3].fireTableRowsInserted(r3, r3);
    });
  }
  ...
  private void delete(int i1, int i2, int i3) {
    r3 = -1;
    r1 = P.models[i1].getRowAt(key);
    r2 = P.models[i2].getRowAt(key);
    if (r1 >= 0) P.models[i1].removeRow(r1);
    if (r2 >= 0) P.models[i2].removeRow(r2);
    if (i3 >= 0) {
      r3 = P.models[i3].getRowAt(key);
      if (r3 >= 0) P.models[i3].removeRow(r3);
    }
    SwingUtilities.invokeLater(() -> {
      if (r1 >= 0) P.models[i1].fireTableRowsDeleted(r1, r1);
      if (r2 >= 0) P.models[i2].fireTableRowsDeleted(r2, r2);
      if (r3 >= 0) P.models[i3].fireTableRowsDeleted(r3, r3);
    });
  }
  ...
}
So, the queue implementation on Publisher and Subscriber's site can be materialized without involving the heavy weight JMS Message Container. Neither IBM MQ, nor Rabbit MQ. One can achieve such goal with ease. And the best of it is that you have full control on your codes. Fast, free of charge and license.

The following image visualizes the working sequence of P2P-PubSub

generic.png
  1. Client A starts to open a connection to a Serialized Object Database Server
  2. Server spawns a proxy peer to represent the P2P communication
  3. Client A does some changes (update/add/delete) and commits the changes
  4. Client A publishes its changes via a message saying that there is a new status of the database dbName by the given key(s).
  5. Client B subscribes the database dbName and receives the message it delegates the message to a Worker who works out the content of the message (affected or unaffected me)
  6. In case that the message is the "subscribed message" the affected JTable is refreshed according to the changes on the database.
This P2P-PubSub cooperation guaranteed virtually the persistency of the message content. Meaning: the message content (the changes of database) won't need to be preserved. Whenever any subscriber comes up it always gets the newest stand of the database. Further, the network message is rarely, very rarely gets lost during the online sessions between the participants.

An example of two different JTables that steady need a "refresh".

JTables.png

(continue HERE)
 
Sửa lần cuối: