Hadoop et MapReduce en action : exemple sur dataset millionsong

Hadoop et MapReduce en Java

Ce site ne sera plus alimenté de contenu après août 2014. Tous les nouveaux articles seront redigés pour www.waitingforcode.com
Jusque là on a vu les exemples simples d'utilisation du MapReduce et de Hadoop. Cependant, les outils permettant de mettre en place les cas de figure plus élaborés, existent. Et dans cet article on montrera comment construire une mini-application Hadoop et MapReduce avec une centaine des données à traiter.

Tout d'abord on verra la spécification du projet. Ensuite on abordera la partie du code pour MapReduce. A la fin on découvrira une nouvelle fonctionnalité de ce modèle.

Exemple de projet MapReduce et Hadoop
Dans notre exemple on utilisera le dataset (groupe des données) mis à disposition par les chercheurs de l'Université de Columbia, connu sous le nom de Million song dataset. On peut télécharger n'importe quel groupe des données. Chaque groupe contient alors une liste des chansons avec des informations très précises, comme : le chanteur, la ville, le titre, l'album ainsi que même les tags associés à la chanson.

Notre projet va consister à récupérer les chansons et à sortir une liste des chanteurs ayant le plus de chansons composées dans le dataset. Les fichiers du dataset sont du type binaire, HDF5. Ce format a été développé par NASA pour gérer de grands volumes de données. Les données peuvent être compressées tout en garantissant la rapidité au niveau Input/Output impressionant. Vu que l'intégration des fichiers HDF5 n'est pas garantit en natif dans Hadoop, on devra les transformer en fichiers JSON. Pour ce faire on utilisera la commande suivante : h5dump -d metadata/songs -o ./outputFile.json -y inputFile.hdf5. Tous les fichiers utilisés peuvent être installés via (pour Ubuntu) Synaptic Application Manager ou le téléchargement depuis le site HDF5

Ensuite, après l'exécution de cette commande sur chaque fichier, il faudra les copier au HDFS avec la commande suivante hadoop dfs -copyFromLocal localFile.json hdfsFile.json. Maintenant la phase pré-éliminaire est faite. On peut passer au développement des méthodes map et reduce.

MapReduce avec Million song dataset
Notre classe de map va faire les choses le plus simplement possible. Tout d'abord elle va splitter le fichier JSON reçu par les sauts de ligne. Ensuite, en fonction de la ligne, elle va passer les paramètres à la classe Song qui représentera chaque chanson. A la fin, le résultat sera rajouté avec la clé correspondant au nom du chanteur. Voici le code de la méthode map() et normalizeString() qui sert à supprimer les guillements et virgules de la chaîne trouvée :

@Override
public void map(NullWritable key, Text value, OutputCollector<Text, Song> collector, Reporter reporter) throws IOException {
Song song = new Song();
String[] parts = value.toString().split("\n");
for (int i = 0; i < parts.length; i++) {
switch (i) {
case 7:
song.setCity(new Text(normalizeString(parts[i])));
break;
case 10:
song.setArtist(new Text(normalizeString(parts[i])));
break;
case 19:
song.setTitle(new Text(normalizeString(parts[i])));
break;
}
}
try {
collector.collect(song.getArtist(), song);
System.out.println("Collected song :"+song + " for json file :"+value.toString());
} catch (Exception e) {
System.out.println("An error occured on setting result for song : "+song+". Message's error : "+e.getMessage());
System.out.println("Value for JSON is :"+value.toString());
}
}

private String normalizeString(String text) {
if (text == null) return "";
return text.replaceAll("\",|\"", "").trim();
}


La fonction reduce va récupérer les groupes créés par la méthode map et calcule le nombre de chansons présentes. La voici :

private static long generalKey = 0;
@Override
public void reduce(Text key, Iterator<Song> values, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
int songs = 0;
String artistName = "";
while (values.hasNext()) {
Song song = (Song) values.next();
if (artistName.equals("")) {
artistName = song.getArtist().toString();
}
System.out.println("Got song : " + song);
if (song != null && song.getTitle() != null && !song.getTitle().toString().trim().equals("")) {
songs++;
}
}
output.collect(new LongWritable(generalKey++), new Text(""+songs+";"+artistName));
}


Les deux méthodes n'ont rien de mystérieux. Cependant, au moment de leur mise en place, plusieurs problèmes apparaissent. On les détaillera un par un dans la liste ci-dessous :

  • Comment utiliser une classe entité et la transmettre à travers les processus MapReduce ?
    Par définition, les classes doivent être sérializables pour pouvoir circuler entre plusieurs ordinateurs dans le cluster. On ne pourra donc pas créer une simple classe JavaBean. A la place, on devra concevoir une classe qui implémente l'interface WritableComparable. En outre, tous les champs de cette classe doivent être du type de données utilisées dans Hadoop, à savoir Text, IntWritable, LongWritable etc. Voici comment se présente la classe en question :

    public class Song implements WritableComparable<Song> {
    private Text city;
    private Text artist;
    private Text title;

    public Song() {
    artist = new Text();
    city = new Text();
    title = new Text();
    }

    public Song(Text artist, Text title, Text city) {
    setArtist(artist);
    setTitle(title);
    setCity(city);
    }

    public Text getArtist() {
    return this.artist;
    }
    public void setArtist(Text artist) {
    this.artist = artist;
    }
    public Text getTitle() {
    return this.title;
    }
    public void setTitle(Text title) {
    this.title = title;
    }
    public Text getCity() {
    return this.city;
    }
    public void setCity(Text city) {
    this.city = city;
    }

    public String toString() {
    StringBuilder result = new StringBuilder("Song : { ");
    result.append("artist : ").append(getArtist());
    result.append(", title :").append(getTitle());
    result.append(", city :").append(getCity());
    result.append(" }");
    return result.toString();
    }

    @Override
    public int hashCode() {
    return artist.hashCode() * 163 + title.hashCode();
    }

    @Override
    public boolean equals(Object object) {
    if (object instanceof Song) {
    Song song = (Song) object;
    return artist.equals(song.getArtist()) && title.equals(song.getTitle()) && city.equals(song.getCity());
    }
    return false;
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
    if (dataInput != null) {
    artist.readFields(dataInput);
    title.readFields(dataInput);
    city.readFields(dataInput);
    }
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
    artist.write(dataOutput);
    title.write(dataOutput);
    city.write(dataOutput);
    }
    @Override
    public int compareTo(Song song) {
    return title.compareTo(song.getTitle());
    }
    }

    Les deux méthodes servent dans la sérialisation et déserialisation, respectivement write et readFields. Dans la première on sauvegarde les données et dans la deuxième on les récupère. Il est très important de mentionner à ce niveau-là, que la classe doit posséder un constructeur publique sans paramètres. Ce constructeur doit initialiser tous les champs utilisés dans la sérialisation et la déserialisation (dans notre code, par exemple avec artist = new Text()). Dans le cas contraire, on aura droit à une exception :

    java.lang.NullPointerException
    at songClient.entity.Song.readFields(Song.java:72)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
    at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1282)
    at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1222)
    at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:250)
    at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:246)
    at songClient.ReducerSong.reduce(ReducerSong.java:30)
    at songClient.ReducerSong.reduce(ReducerSong.java:1)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:520)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

    MapReduce n'initialise pas les champs tout seul. Il se contente uniquement d'y placer une valeur. C'est la raison pour laquelle les champs doivent être initialisés à chaque fois.

  • Comment lire un fichier entier en Hadoop et MapReduce ?
    Par définition, les fichiers sont lus ligne par ligne. Cela signifie que chaque ligne est soumise à la tâche map. Cependant, ce comportement est inapproprié à notre situation où l'on veut recevoir les fichiers en entier pour pouvoir les splitter par nous-mêmes. D'où l'obligation de spécifier un format d'entrée dans la configuration. Cela se fait avec l'appel configuration.setInputFormat(JsonFileInputFormat.class);. Notre classe de lecture du fichier en entier se présente de la manière suivante :

    public class JsonFileInputFormat extends FileInputFormat<NullWritable, Text> {

    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
    return false;
    }

    @Override
    public RecordReader getRecordReader(InputSplit inputSplit, JobConf configuration, Reporter reporter) throws IOException {
    return new JsonFileRecordReader((FileSplit) inputSplit, configuration);
    }

    }

    Grâce à la méthode isSplitable() on sait que le fichier d'entrée ne peut pas être découpé par lignes. La fonction getRecordReader() fournit la classe qui s'occupera alors de lire le fichier et de nous le fournir en entier. Voici cette classe :

    public class JsonFileRecordReader implements RecordReader<NullWritable, Text> {

    private FileSplit fileSplit;
    private Configuration conf;
    private boolean processed = false;

    public JsonFileRecordReader(FileSplit fileSplit, Configuration conf)
    throws IOException {
    this.fileSplit = fileSplit;
    this.conf = conf;
    }

    @Override
    public void close() throws IOException {
    // TODO Auto-generated method stub

    }

    @Override
    public NullWritable createKey() {
    return NullWritable.get();
    }

    @Override
    public Text createValue() {
    return new Text();
    }

    @Override
    public long getPos() throws IOException {
    return processed ? fileSplit.getLength() : 0;
    }

    @Override
    public float getProgress() throws IOException {
    return processed ? 1.0f : 0.0f;
    }

    @Override
    public boolean next(NullWritable key, Text value) throws IOException {
    if (!processed) {
    Path file = fileSplit.getPath();
    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream in = null;
    try {
    byte[] contents = new byte[(int) fileSplit.getLength()];
    in = fs.open(file);
    IOUtils.readFully(in, contents, 0, contents.length);
    value.append(contents, 0, contents.length);
    } finally {
    IOUtils.closeStream(in);
    }
    processed = true;
    return true;
    }
    return false;
    }

    }


    Méthode la plus intéressante est next(). C'est elle qui lit chaque fichier qui n'a pas été traité. Le contenu trouvé est ensuite rajouté à l'élément value de la paire "clé:valeur", utilisée par tout le mécanisme MapReduce. D'autres méthodes permettent de suivre le progrès de traitement ainsi qu'initialiser la clé et la valeur. Il faut noter que la clé est issue de la classe NullWritable. Cela s'explique par le fait que l'on veut traiter le fichier JSON en tant qu'un fichier, se détachant un peu de la logique "clé:valeur" (un peu car la notion de la clé est toujours présente, sauf que la clé n'est pas spécifiée).


  • Comment lire le résultat groupé de la tâche MapReduce ?
    Souvent plusieurs fichiers avec les résultats peuvent être créées. Il peut alors s'avérer difficile de pouvoir les analyser une fois l'opération MapReduce terminée. Pour faciliter cette tâche, notre code va utiliser un format spécial de sortie qui sera MapFileOutputFormat. Ce format garantit la création d'une fichier de sortie contenant, dans les fichiers séparés, les clés et les valeurs. Un peu comme la Map en Java, MapFileOutputFormat permet d'itérer à travers le résultat pour, par exemple, le trier à nouveau.
    Pour spécifier le format map dans, on peut utiliser la méthode configuration.setOutputFormat(MapFileOutputFormat.class).
    Cependant, on peut alors rencontrer un problème. Dans notre code, la fonction reduce devrait créer la sortie du type "nom_chanteur:nombre_chansons". Mais une contrainte l'empêche. MapFileOutputFormat impose à la méthode reduce deux choses :
    - la clé étant l'instance de LongWritable
    - la valeur étant l'instance de Text
    Si ce n'est pas le cas, on peut rencontrer l'exception suivant au moment de l'exécution :

    13/08/17 14:08:15 INFO mapred.JobClient: map 100% reduce 33%
    13/08/17 14:08:19 INFO mapred.JobClient: Task Id : attempt_201308170801_0034_r_000000_2, Status : FAILED
    java.io.IOException: wrong key class: Carl Cox Vs Yousef is not class org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.io.SequenceFile$BlockCompressWriter.append(SequenceFile.java:1360)
    at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1039)
    at org.apache.hadoop.io.MapFile$Writer.append(MapFile.java:196)
    at org.apache.hadoop.mapred.MapFileOutputFormat$1.write(MapFileOutputFormat.java:74)
    at org.apache.hadoop.mapred.MapFileOutputFormat$1.write(MapFileOutputFormat.java:69)
    at org.apache.hadoop.mapred.ReduceTask$OldTrackingRecordWriter.write(ReduceTask.java:458)
    at org.apache.hadoop.mapred.ReduceTask$3.collect(ReduceTask.java:498)
    at songClient.ReducerSong.reduce(ReducerSong.java:34)
    at songClient.ReducerSong.reduce(ReducerSong.java:1)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:520)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)



Maintenant, que tous les problèmes potentiels sont résolus, on peut voir la classe qui va s'occuper d'exécuter la tâche :

public class WorkerSong {
public static void main(String[] args) throws IOException {
JobConf configuration = new JobConf(WorkerSong.class);
configuration.setJobName("Worker sample test");

FileSystem hdfsFileSystem = FileSystem.get(configuration);
hdfsFileSystem.delete(new Path("/user/bartosz/sampleOutput1"), true);
Date startDate = new Date();
System.out.println("Start at "+startDate);
long startMs = startDate.getTime();

FileInputFormat.addInputPath(configuration, new Path("/user/bartosz/*.json"));

FileOutputFormat.setOutputPath(configuration, new Path("/user/bartosz/sampleOutput1"));
configuration.setMapperClass(MapperSong.class);
configuration.setReducerClass(ReducerSong.class);
configuration.setOutputFormat(MapFileOutputFormat.class);

configuration.setInputFormat(JsonFileInputFormat.class);
configuration.setMapOutputKeyClass(Text.class);
configuration.setMapOutputValueClass(Song.class);
System.out.println("Configuration : "+configuration);
JobClient.runJob(configuration);
Date endDate = new Date();
System.out.println("End at "+endDate);
long endMs = endDate.getTime();
long elapsedTimeMs = endMs-startMs;
System.out.println("Task done in " + String.format("%d min, %d sec",
TimeUnit.MILLISECONDS.toMinutes(elapsedTimeMs),
TimeUnit.MILLISECONDS.toSeconds(elapsedTimeMs) -
TimeUnit.MINUTES.toSeconds(TimeUnit.MILLISECONDS.toMinutes(elapsedTimeMs))
));
}
}

Quelques nouveaux éléments ont apparu par rapport à la même classe de l'article consacré à la mise en place de Hadoop. Tout d'abord, on remarque la présence d'on compteur qui servira à afficher le temps total d'exécution de la tâche. Egalement la définition des fichiers à inclure (addInputPath()) a changé. Le nom du fichier a été remplacé par un wildcard (*). Cette expression ressemble à celle utilisée dans les expressions régulières et signifie que l'on va traiter tous les fichiers du répertoire /user/bartosz qui ont une extension .json. Une autre différence est la présence de la méthode setOutputFormat() qui spécifie le format de sortie pour la tâche. On détermine aussi le format d'entrée (setInputFormat()). Si l'on remonte dans l'article, on verra qu'il sert à lire les fichiers JSON en entier et non pas ligne par ligne. La valeur passée à la méthode setMapOutputValueClass() a également été modifié. A la place d'une classe standarde de la librairie, on passe la classe personnalisée, représentant une chanson dans notre dataset. A la fin on démarre notre tâche comme avant, avec l'appel JobClient.runJob(configuration).

Voici le résultat du traitement :

bartosz@bartosz-K70ID:~/hadoop-1.1.2$ hadoop songClient.WorkerSong
Warning: $HADOOP_HOME is deprecated.

Start at Sat Aug 17 15:12:18 CEST 2013
Configuration : Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, hdfs-default.xml, hdfs-site.xml
13/08/17 15:12:18 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/08/17 15:12:18 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/08/17 15:12:19 INFO mapred.FileInputFormat: Total input paths to process : 500
13/08/17 15:12:21 INFO mapred.JobClient: Running job: job_201308170801_0044
...
13/08/17 15:13:32 INFO mapred.JobClient: map 4% reduce 0%
13/08/17 15:14:41 INFO mapred.JobClient: map 8% reduce 2%
13/08/17 15:21:30 INFO mapred.JobClient: map 34% reduce 11%
13/08/17 15:24:54 INFO mapred.JobClient: map 47% reduce 15%
13/08/17 15:28:59 INFO mapred.JobClient: map 60% reduce 20%
13/08/17 15:35:33 INFO mapred.JobClient: map 85% reduce 28%
13/08/17 15:40:02 INFO mapred.JobClient: map 100% reduce 33%
13/08/17 15:39:57 INFO mapred.JobClient: map 99% reduce 33%
13/08/17 15:40:02 INFO mapred.JobClient: map 100% reduce 33%
13/08/17 15:40:18 INFO mapred.JobClient: map 100% reduce 100%
13/08/17 15:40:22 INFO mapred.JobClient: Job complete: job_201308170801_0044
13/08/17 15:40:22 INFO mapred.JobClient: Counters: 30
13/08/17 15:40:22 INFO mapred.JobClient: Job Counters
13/08/17 15:40:22 INFO mapred.JobClient: Launched reduce tasks=1
13/08/17 15:40:22 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=3236772
13/08/17 15:40:22 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/08/17 15:40:22 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/08/17 15:40:22 INFO mapred.JobClient: Launched map tasks=500
13/08/17 15:40:22 INFO mapred.JobClient: Data-local map tasks=500
13/08/17 15:40:22 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=1591340
13/08/17 15:40:22 INFO mapred.JobClient: File Input Format Counters
13/08/17 15:40:22 INFO mapred.JobClient: Bytes Read=219600
13/08/17 15:40:22 INFO mapred.JobClient: File Output Format Counters
13/08/17 15:40:22 INFO mapred.JobClient: Bytes Written=16039
13/08/17 15:40:22 INFO mapred.JobClient: FileSystemCounters
13/08/17 15:40:22 INFO mapred.JobClient: FILE_BYTES_READ=31404
13/08/17 15:40:22 INFO mapred.JobClient: HDFS_BYTES_READ=268100
13/08/17 15:40:22 INFO mapred.JobClient: FILE_BYTES_WRITTEN=26169178
13/08/17 15:40:22 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=16039
13/08/17 15:40:22 INFO mapred.JobClient: Map-Reduce Framework
13/08/17 15:40:22 INFO mapred.JobClient: Map output materialized bytes=34398
13/08/17 15:40:22 INFO mapred.JobClient: Map input records=500
13/08/17 15:40:22 INFO mapred.JobClient: Reduce shuffle bytes=34398
13/08/17 15:40:22 INFO mapred.JobClient: Spilled Records=1000
13/08/17 15:40:22 INFO mapred.JobClient: Map output bytes=30396
13/08/17 15:40:22 INFO mapred.JobClient: Total committed heap usage (bytes)=80831578112
13/08/17 15:40:22 INFO mapred.JobClient: CPU time spent (ms)=227280
13/08/17 15:40:22 INFO mapred.JobClient: Map input bytes=219600
13/08/17 15:40:22 INFO mapred.JobClient: SPLIT_RAW_BYTES=48500
13/08/17 15:40:22 INFO mapred.JobClient: Combine input records=0
13/08/17 15:40:22 INFO mapred.JobClient: Reduce input records=500
13/08/17 15:40:22 INFO mapred.JobClient: Reduce input groups=470
13/08/17 15:40:22 INFO mapred.JobClient: Combine output records=0
13/08/17 15:40:22 INFO mapred.JobClient: Physical memory (bytes) snapshot=90486362112
13/08/17 15:40:22 INFO mapred.JobClient: Reduce output records=470
13/08/17 15:40:22 INFO mapred.JobClient: Virtual memory (bytes) snapshot=562404990976
13/08/17 15:40:22 INFO mapred.JobClient: Map output records=500
End at Sat Aug 17 15:40:22 CEST 2013
Executed at 1684448 ms
Task done in 28 min, 4 sec


Trier les résultats dans MapReduce
La tâche reduce nous retourne une sorte de Map Java. On peut donc itérer à travers tous les résultats. Pour effectuer l'itération on doit utiliser un org.apache.hadoop.io.MapFile.Reader. Elle diffère un peu de l'interface java.util.Map par le fait qu'elle ne fournit que l'accès aux données en lecture. Ensuite on observera la présence d'un org.apache.hadoop.mapred.Partitioner dont le rôle consiste à récupérer le numéro de partition pour une clé donnée.

Il faut alors savoir que la classe MapFileOutputFormat retourne deux fichiers séparés : index et data. Le premier contient les clés de la map tandis que le deuxième les valeurs qui y correspondent. Les clés sont placées dans les partitions dont le nombre est égal au nombre des tâches reduce dans l'application. C'est la raison qui explique la présence du Partitioner.


public class TheBestArtistsWorker {

public static void main(String[] args) throws IOException {
JobConf configuration = new JobConf(WorkerSong.class);
configuration.setJobName("Worker sample test");
Path path = new Path("sampleOutput1");
LongWritable key = new LongWritable(0);

FileSystem fs = path.getFileSystem(configuration);
Reader[] readers = MapFileOutputFormat.getReaders(fs, path, configuration);
Partitioner<LongWritable, Text> partitioner = new HashPartitioner<LongWritable, Text>();
Text val = new Text();

Reader reader = readers[partitioner.getPartition(key, val, readers.length)];

Map> artistsRating = new TreeMap<Integer, List<String>>(Collections.reverseOrder());
ArtistParser parser = new ArtistParser();
LongWritable nextKey = new LongWritable(0);
while(reader.next(nextKey, val)) {
parser.parse(val.toString());
if (parser.isOkResult()) {
List<String> artistsList = null;
if (artistsRating.containsKey(parser.getSongs())) {
artistsList = artistsRating.get(parser.getSongs());
} else {
artistsList = new ArrayList<String>();
}
artistsList.add(parser.getArtist());
artistsRating.put(parser.getSongs(), artistsList);
}
}

Iterator<Integer> keysIterator = artistsRating.keySet().iterator();
while (keysIterator.hasNext()) {
int iteratorKey = keysIterator.next();
System.out.println(iteratorKey + " songs for artists : "+artistsRating.get(iteratorKey));
}

}
}

Dans un premier temps on récupère le tableau des "lecteurs" (Reader[]) pour un résultat de MapReduce. Ensuite on initialise les Partitioners qui vont se charger de lire les partitions dans la map. Après on récupère le lecteur pour la première clé générée (0) et on commence l'itération avec la boucle while(). Dedans, chaque résultat est traité par le parseur qui ne fait rien d'autre que splitter la ligne pour récupérer le nom du chanteur et le nombre des chansons. A la fin on affiche tout sur l'écran. Et voici la classe du parseur qui n'a rien de compliqué :

class ArtistParser {

private String artist;
private int songs;

public void parse(String line) {
songs = 0;
artist = null;
String[] result = line.split(";");
try {
songs = Integer.parseInt(result[0]);
artist = result[1];
} catch (Exception e) {
System.out.println("An error occured on parsing line " + line + " : " + e.getMessage());
}
}

public void parse(Text line) {
parse(line.toString());
}

public String getArtist() {
return artist;
}

public int getSongs() {
return songs;
}

public boolean isOkResult() {
return (artist != null);
}

}


L'article a présenté de nouveaux concepts du monde Hadoop et MapReduce. On a vu qu'il était possible d'analyser les fichiers entiers, grâce au développement d'un format de sortie. Ensuite on a présenté comment analyser le résultat de MapReduce sous forme d'un MapFileOutputFormat et d'en créer une liste complète et triée. MapReduce est donc un outil puissant, adapté à plusieurs scénarios d'usage quotidien.
Bartosz KONIECZNY 08-09-2013 16:15 Hadoop
Moi

Développeur d'applications Internet et journaliste passionné par l'adjectif français. Un aigle polonais orienté vers la progression, volant très haut et écoutant du zouk après les matches du foot français.

Vous appréciez mon travail ?

Pour contribuer au développement de ce site, ou pour remercier pour des articles rédigés, vous pouvez faire un don.

Un conseil Zend Framework

Comment faciliter le travail entre notre application et l'Ajax ?

L'un des moyens est l'utilisation du ContextSwitch. Il permet de définir le type de réponse renvoyé.