Utilizzare i mutex di MySQL per la sincronizzazione di processi separati

Non tutti sono a conoscenza di alcune interessanti API messe a disposizione di MySQL per la sincronizzazione di processi; MySQL infatti rende disponibile agli sviluppatori delle basilari funzioni per la gestione di mutex remoti. Le funzioni che analizzeremo (e che sono documentate qui) sono: 

 

  • GET_LOCK(name, timeout): ottiene il lock sul mutex indicato (che si chiama name) , aspettando al massimo per timeout secondi. La funzione restituisce 1 se il lock è stato acquisito, 0 se non si è riuscito ad ottenre il lock prima dello scadere del timeout, NULL in caso di errore. Il lock verrà rilasciato all'accadere di uno dei seguenti eventi: (1) chiusura (voluta o forzata) della connessione al database, (2) nuovo utilizzo di GET_LOCK (questo vuol dire che non è possibile lockare più mutex contemporaneamente sulla stessa connessione), (3) si chiama RELEASE_LOCK per il rilascio esplicito del mutex.
  • RELEASE_LOCK(name): rilascia il lock sul mutex indicato. Restituisce 1 se il lock è stato rilasciato, 0 altrimenti (oppure NULL se il mutex non esiste).
  • IS_FREE_LOCK(name): restituisce 1 se il mutex non è lockato, 0 altrimenti (NULL in caso di errore).
  • IS_USED_LOCK(name): restituisce l'ID della connessione del thread che detiene il mutex, NULL se il mutex non è lockato da nessuno.

Come utilizzare queste funzioni

L'utilizzo classico è:

  1. ottengo il lock (GET_LOCK)
  2. eseguo delle operazioni
  3. rilascio il lock (RELEASE_LOCK)

con questo procedimento tutte le operazioni eseguite al punto 2 saranno eseguite esclusivamente da un processo per volta.

Vediamo una possibile implementazione in Groovy di una classe che sfrutti queste funzionalità di MySQL. Tale implementazione metterà a disposizione i seguenti metodi:

  • lock(), ottiene il lock sul mutex (la chiamata è bloccante, il controllo non verrà sostituito fino all'ottenimento del lock)
  • lock(int timeout), ottiene il lock sul mutex attendendo al massimo timeout secondi. In caso scatti il timeout il lock non è acquisito e viene ritornato il valore false.
  • unlock(), rilascia il lock
  • isLocked(), restutuisce true se il mutex risulta lockato (dal thread corrente oppure da un altro thread)
  • isLockOwned(), restituisce true se il mutex è lockato e se il lock è del thread corrente
  • execute(Closure closure), esegue il blocco di codice indicato in maniera esclusiva (cioè esegue un lock-codice-unlock)

--- INIZIO FILE MyMutex.groovy ---

package it.lorenzoingrilli.mymutex

import groovy.sql.Sql
import java.sql.Connection
import java.sql.SQLException

class MyMutex {

private String name
private Connection connection
private Sql sql

public MyMutex(String name, Connection connection) {
    this.name = name;
    this.connection = connection;
    this.sql = new Sql(connection);
}

public void execute(Closure closure) {
    lock()
    closure.call()
    unlock()
}

public synchronized void lock() throws SQLException {
    while(!lock(10));
}

public synchronized boolean lock(int timeout) throws SQLException {
    if(timeout<0)
        throw new IllegalArgumentException("Timeout must be >= 0");
    def result = sql.firstRow("SELECT GET_LOCK(${name}, ${timeout}) AS locked");
    return result!=null && result.locked==1;
}

public synchronized void unlock() throws SQLException {
    sql.execute("DO RELEASE_LOCK(${name})")
}

public synchronized boolean isUnlocked() throws SQLException {
    def result = sql.firstRow("SELECT IS_FREE_LOCK(${name}) AS free")
    return result!=null && result.free==1 
}

public synchronized boolean isLocked() throws SQLException {
    return !isUnlocked();
}

public synchronized boolean isLockOwned() throws SQLException {
    // ottengo il connection ID corrente 
    long connectionId = sql.firstRow("SELECT CONNECTION_ID() AS cid").cid
    // ottengo il connection ID del thread che ha il lock sul mutex
    Long lockOwner = sql.firstRow("SELECT IS_USED_LOCK(${name}) AS cid")?.cid
    return lockOwner!=null && lockOwner==connectionId;
}

}

 

--- FINE FILE MyMutex.groovy ---

 

La classe MyMutex necessità di una connessione al database e del nome del mutex da utilizzare. 

Mettiamo alla prova la classe MyMutex. Per farlo scriviamo due test case molto rozzi, nel primo lanciamo 3 thread non sincronizzati, nel secondo invece lanciamo i thread sincronizzandoli tramite di MyMutex.

 

Lanciamo 3 processi differenti (e concorrenti) del test case TestWithoutMutex.groovy:

def id = System.currentTimeMillis();
println "thread ${id}: inizio"
Thread.sleep(250);
println "thread ${id}: fine"

tramite il comando:

groovy TestWithoutMutex.groovy & groovy TestWithoutMutex.groovy &  groovy TestWithoutMutex.groovy & 
ottenendo il seguente output:

	
thread 1298331951760: inizio
thread 1298331951760: fine
thread 1298331952040: inizio
thread 1298331952103: inizio
thread 1298331952040: fine
thread 1298331952103: fine

In questo caso notiamo come i thread si "mischino" tra loro (si evince dalle più scritte inizio oppure fine di seguito); la cosa rende evidente la non sincronizzazione dei thread. In questo esempio particolare i due thread 1298331952040 e 1298331952103 si sono sovrapposti

 

 

Prendiamo invece in esame il test case TestMutex.groovy:

import java.sql.DriverManager
import it.lorenzoingrilli.mymutex.MyMutex

String username = "test"
String password = "testpsw"
String url = "jdbc:mysql://localhost/test"
String mutexName = "test"

def mutex = new MyMutex(mutexName, DriverManager.getConnection(url, username, password));
mutex.execute {
    def id = System.currentTimeMillis();
    println "thread ${id}: inizio"
    Thread.sleep(250);
    println "thread ${id}: fine"
}
Lanciamo 3 processi differenti (e concorrenti) tramite il comando:
groovy TestMutex.groovy & groovy TestMutex.groovy &  groovy TestMutex.groovy & 
ottenendo il seguente output:
thread 1298331820382: inizio
thread 1298331820382: fine
thread 1298331820723: inizio
thread 1298331820723: fine
thread 1298331821048: inizio
thread 1298331821048: fine

notiamo che tra "inizio" e "fine" di ogni thread non ci sono intromissioni da parte di altri thread, il che ci mostra come i thread siano correttamente sincronizzati. Questa sincronizzazione avviene in tutti i casi: (a) thread differenti all'interno della stessa virtual machine, (b) processi separati in esecuzione sulla stessa macchina, (3) processi separati in esecuzione su macchine separati

NOTA 1: Questo articolo è stato scritto pensando ad un unico server MySQL che più processi utilizzeranno per sincronizzarsi tra loro. Per certo le tecniche indicate non funzioneranno in configurazioni di replication master-slave di MySQL (a meno che tutti i software si sincronizzino utilizzando solo ed esclusivamente il server master). Non è stato effettuato alcun test su MySQL Cluster.

NOTA 2: ricordatevi che per eseguire i codici di esempio dovete avere il Connector/J JDBC per MySQL nel vosto CLASSPATH