Cassandra – Java – Exemple d’importation de données à partir d’un fichier plat


Cassandra est un noyau de bases de données dédié Big Data fait pour stocker de grandes quantités de données aux fins d’analyse. Comme toutes bases de données analytiques, l’une des étapes essentielles est le chargement massif de données bien souvent en provenance de sources hétérogènes de données (bases de données des opérations courantes, mainframe, …).

L’un des modes d’alimentation très courant peut passer par l’import de données sous forme de fichier CSV (Comma-Separated Values) lorsqu’aucun pilote de base de données n’existe. Il existe toutefois des pilotes JDBC permettant de lire les fichiers CSV, leur utilisation sortant du cadre de cet article j’ai directement lu le fichier avec les classes FileReader et BufferedReader, puisqu’il s’agit surtout de montrer comment charger des données dans Cassandra.

L’exemple qui suit en Java montre une possibilité parmi tant d’autres d’importer le contenu d’un fichier CSV dans une table d’un keyspace Cassandra.

Classe LoadVarTable de manipulation du fichier CSV

Cette classe est très basique et n’est jamais que la manipulation d’un fichier présent sur disque. Elle est là pour pouvoir envoyer les données à l’objet de classe de chargement effectif des données (voir classe suivante).

La forme du fichier est la suivante :

An_Recens,1968,Recensement de 1968
An_Recens,1975,Recensement de 1975
An_Recens,1982,Recensement de 1982
An_Recens,1990,Recensement de 1990
An_Recens,1999,Recensement de 1999
An_Recens,2009,Recensement de 2009

La première valeur est le type fonctionnel de la donnée, la deuxième la donnée en format court, la troisième la donnée en format description longue (humaine).

Classe LoadVarTable :

package loadVarTable;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

public class LoadVarTable {

 public static void main(String[] args) {
     LoadIntoCassandra lIC = new LoadIntoCassandra();
     try 
     {
         BufferedReader in = new BufferedReader(
                                 new FileReader("./varmod.csv"));
         String str = "";
         while((str = in.readLine()) != null) {
             String[] params = str.split(",");
             lIC.loadData(params);
             System.out.println(str);
         }
         in.close();
         System.exit(0);
     } catch(IOException e) {
         System.out.println("Fichier inexistant");
         System.exit(1);
} } }

Classe LoadIntoCassandra de chargement effectif des données dans Cassandra

Cette classe permet le chargement des données dans la table type_data. La classe se sert de la possibilité offerte par le driver JDBC de préparer les requêtes et de passer ensuite les données en paramètres au moment de l’exécution de la requête. Ainsi, le moteur de base de données n’a pas besoin d’analyser la requête et définir son plan d’exécution avant chaque exécution.

La préparation de requête passe par l’objet de classe PreparedStatement. Le passage des paramètres à l’exécution passe par la méthode bind de l’objet de classe PreparedStatement. La préparation de l’instruction n’est faite qu’une fois sur toute la durée de la session lors de la construction de l’objet de classe LoadIntoCassandra.

La méthode execute, comme son nom l’indique, procède à l’exécution de l’instruction insert préparée juste avant.

Classe LoadIntoCassandra :

package loadVarTable;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;

public class LoadIntoCassandra {

 static final String USER = "recensement";
 static final String PWD = "xxxxxxxxxxxx";
 static final String KEYSPACE = "recensement";
 static final String SERVER = "192.168.1.101";
 static final String LOCALDC = "dc1";
 static final int USEDHOSTS = 6;
 static final String INSERT = "insert into "
         + "type_data(nom_data,data,data_desc) values(?,?,?)";
 
 Cluster cluster;
 Session session;
 PreparedStatement stmt;
 
 public LoadIntoCassandra() {
     this.cluster = Cluster.builder()
         .addContactPoint(SERVER)
         .withCredentials(USER, PWD)
         .withLoadBalancingPolicy(
              DCAwareRoundRobinPolicy.builder()
                  .withLocalDc(LOCALDC)
                  .withUsedHostsPerRemoteDc(USEDHOSTS)
                  .allowRemoteDCsForLocalConsistencyLevel()
                  .build())
         .build();
     this.session = cluster.connect(KEYSPACE);
     this.stmt = this.session.prepare(INSERT);
 }
 
 public void loadData(String[] data) {
 
     this.session.execute(this.stmt.bind(data[0],data[1],data[2]));

 }


}