Configuration Hadoop et MapReduce

Hadoop et MapReduce en Java

Ce site ne sera plus alimenté de contenu après août 2014. Tous les nouveaux articles seront redigés pour www.waitingforcode.com
Jusque là on a vu deux aspect fondamentaux de l'Hadoop : HDFS et MapReduce. Maintenant on peut aller plus en détail et présenter la thématique de configuration.

Cet article consacré à la configuration de l'Hadoop sera divisé en plusieurs sous-parties. Chacune contiendra une liste des options bonnes à connaître pour mettre en place le système correctement. Au début on verra les options liées au système des fichiers HDFS. Plus loin on abordera celles qui concernent le mécanisme MapReduce. Après on listera les paramètres qui ne rentrent dans aucune de ces deux catégories. A la fin on présentera l'écriture de ces configurations dans le fichier.

Une multiple valeur entre parenthèse, séparée par un pipe (|), indique que l'un des éléments séparés peut y être placé. Par exemple si une propriété contient (map|reduce), cela veut dire qu'à la place du parénthèse il faut laisser soit map, soit reduce.

Options système de fichiers
* hadoop.tmp.dir : indique le répertoire temporaire dans lequel vont être stockés les fichiers temporaires. Sa valeur par défaut correspond au /tmp/hadoop-${nom_utilisateur}

* fs.default.name : le nom du système de fichiers par défaut. Il s'agit d'une URI. La valeur par défaut est file://. Pour qu'il correspond à l'HDFS, il peut par exemple prendre comme valeur : hdfs://localhost

* dfs.block.size : la taille du block dans HDFS. Par défaut il correspond au 64MB. Il faut garder en tête qu'aucun espace n'est perdu. La taille du block est occupée à 100%. On peut donc sans aucun problème laisser 64MB pour de petits fichiers ou augmenter pour ceux qui sont plus grand. Car moins de blocks occupe un fichier, plus vite il sera chargé par le namenode.

* fs.checkpoint.dir : le(s) répertoire(s) où secondary namenode va stocker les fichiers temporaires du système à fusionner avec le namenode principal.

* dfs.hosts.exclude : indique le fichier qui stocke une liste des machines qui ne sont pas autorisées à se connecter au namenode.

* dfs.hosts : indique le fichier qui stocke une liste des machines qui sont autorisées à se connecter au namenode.

* dfs.datanode.ipc.address : l'adresse IPC et le port du datanode.

* dfs.datanode.du.reserved : par défaut datanodes vont tenter d'occuper tout l'espace disponible dans leur répertoire de stockage. Cette propriété permet de spécifier l'espace réservé aux autres types de données, pas liés aux datanodes.

* fs.trash.interval : le système de corbeille dans Hadoop se base sur le déplacement d'un fichier supprimé dans un répertoire temporaire. Ensuite, ce fichier y reste pendant le temps déterminé en minutes dans cette propriété. Au bout de ce temps, il est définitivement supprimé du système.

* dfs.replication : le nombre des replicatas à effectuer pour chaque block. La valeur par défaut est 3.

* dfs.replication.(min|max) : le nombre minimal ou maximal des replicatas (par défaut 1 min. et 512 max.)

* dfs.namenode.secondary.http-address : l'adresse et le port du secondary namenode.

* dfs.datanode.address : l'adresse et le port du serveur datanode utilisé pour le transfert des données.

* dfs.namenode.name.dir : le répertoire ou une liste des répertories séparés par une virgule, où le système de fichiers distribués va placer fsimage du namenode.

* dfs.datanode.data.dir : comme dans le cas du dfs.namenode.name.dir, sauf que c'est destiné au stockage des blocks du datanode. Les répertoires inexistants ne sont pas pris en compte.

Options MapReduce
* io.sort.mb : la taille de la mémoire allouée au tri des données pendant la phase map. Quand le contenu du tampon de la mémoire atteint 80%, Hadoop commence à l'écrire sur le disque, tout en continuant d'accepter les données dans le tampon.

* io.sort.factor : le nombre des flux à fusionner parallèlement pendant le tri des fichiers dans la phase shuffle. Il permet à déterminer le nombre boucles qui devront fusionner les fichiers issus de la phase map en un seul. Par exemple, si sa valeur est 5 et on a 10 résultats à traiter, on va avoir 2 fichiers finaux. Chaque fichier sera composé de 5 résultats de la phase map.

* mapred.job.tracker : l'adresse du jobtracker

* mapred.task.profile.(maps|reduces) : le rang des tâches map ou reduce qui doivent être analysées par le profiler.

* job.end.notification.url : l'adresse à laquelle sera envoyée la notification suite à la réalisation de la dernière tâche par tous les tasktrackers.

* mapred.(map|reduce).max.attempts : chaque fois où jobtracker apprend qu'une tâche du tasktracker a échoué, il essaie de la reprogrammer. Le nombre de reprogrammations admises est renseigné dans cette propriété.

* mapred.max.(map|reduce).failures.percent : indique le pourcentage des tâches qui peuvent échouer. Dans certains cas, quelques pourcents des tâches échouées ne doivent pas forcément invalider les résultats du MapReduce.

* mapred.reduce.parallel.copies : le nombre de threads du reduce task qui peuvent tourner en parallèle pour copier les résultats de traitement du map task (5 par défaut).

* mapred.compress.map.output : par défaut false. Si true, le résultat de traiement du map task sera compressé.

* mapred.tasktracker.http.threads : le nombre des threads qui seront utilisés par tasktracker pour récupérer les résultats de map task à travers HTTP.

* mapred.job.reduce.input.buffer.percent : le pourcentage de la mémoire occupée par les résultats du map. Ce pourcentage est relatif à la taille maximale du heap. La phase reduce ne peut commencer que la mémoire utilisée par ces résultats est inférieure à la valeur de ce champ de configuration.

* mapred.input.dir : une liste des répertoires séparés par des virgules avec des fichiers d'entrée.

* mapred.(min|max).split.size : la taille minimale et maximale de l'entrée de la phase map. A partir de cette taille l'entrée peut être divisée en plusieurs sous-parties.

* mapred.system.dir : le chemin vers le répertoire dans lequel MapReduce va stocker les fichiers partagés tels que les fichiers de configuration, les JARs etc.

* mapred.output.compress : indique si le résultat du traitement MapReduce devrait être compressé. Si true, on peut renseigner les propriétés mapred.output.compression.type, mapred.output.compression.codec pour préciser comment (quel type et avec quel algorithme) la compression doit se faire.

* mapred.local.dir : le ou les répertoires (séparés par une virgule) qui stockent les fichiers de "travail" des tâches MapReduce.

* mapred.system.dir : le répertoire où les fichiers partagés (ex. JAR, configuration utilisée par tasktrackers pour tourner les tâches MapReduce) sont stockés.

* mapred.tasktracker.map.tasks.maximum / mapred.task.tracker.reduce.tasks.maximum : réflète le nombre de cores disponibles sur tasktracker machines.

* mapred.child.java.opts : mémoire disponible pour la JVM de l'enfant du tasktracker.

* mapred.hosts : liste des machines autorisées dans le cluster (tasktrackers).

* mapred.hosts.exclude : exclusion machiens du cluster (tasktrackers)

* mapred.job.tracker.info.port : le numéro de port pour le jobtracker du MapReduce.

* mapred.task.tracker.(output|report).port : le numéro de port de départ pour le tasktracker du MapReduce. A partir de ce port le tasktracker va chercher un port libre.

* mapreduce.job.classloader : une valeur booléenne qui indique s'il faut utiliser un classloader séparé pour les JVM des tâches. Si true, la propriété mapreduce.job.classloader.system.classes doit également être renseignée avec une liste des classes à charger. Les classes doivent être séparées par une virgule.

* mapreduce.job.reduce.shuffle.consumer.plugin.class : la classe que les tâches reduce vont utiliser pour envoyer les requêtes shuffle. La classe doit implémenter l'interface org.apache.hadoop.mapred.ShuffleConsumerPlugin.

* mapreduce.jobtracker.hosts.exclude.filename : indique le fichier qui contient une liste des hôtes qui devraient être exclus par jobtracker.

* mapreduce.jobtracker.hosts.filename : indique le fichier qui contient une liste des machines autorisées à se connecter au jobtracker. Si cette entrée est vide, cela signifie que toutes les machines y sont autorisées.

* mapreduce.job.userlog.retain.hours : le nombre d'heures pendnat lesquelles les logs utilisateurs sont stockées sur la machine après la complétion d'une tâche.

* mapreduce.jobtracker.maxtasks.perjob : le nombre maximal des sous-tâches à effectuer. La valeur -1 signifie aucune limitation.

* mapreduce.(map|reduce).speculative : si true, cela veut dire que plusieurs instances de la même sous-tâche map ou reduce peuvent s'exécuter en parallèle. Il s'agit d'une notion "speculative execution of tasks". Son principe se base sur le temps d'exécution des sous-tâches. Si Hadoop détecte qu'une sous-tâche s'exécute trop lentement, il va essayer de tourner la même sous-tâche après que toutes les autres sous-tâches de la tâche soient lancées. Si finalement la sous-tâche marquée comme lente se termine correctement, toutes les sous-tâches speculatives sont détruites. Si la sous-tâche specualtive se termine avant la sous-tâche d'origine, son résultat est pris en compte.

* mapreduce.task.timeout : le nombre de milisecondes après lesquels une sous-tâche sera terminée si elle ne fait ni d'écriture, ni de lecture. Si 0, le timeout est vérouillé.

* mapreduce.job.(maps|reduces) : le nombre des sous-tâches par tâche.

Les autres options
* fs.file.impl : le système de fichiers utilisé. Si l'on personnalise cette entrée, on peut utiliser, par exemple, des méthodes de checksum différentes (possible avec la valeur org.apache.hadoop.fs.RawLocalFileSystem).

* hadoop.job.ugi : le groupe de l'utilisateur qui fait tourner Hadoop

* io.file.buffer.size : détermine la taille du tampon utilisée pendant les opérations d'écriture et de lecture des fichiers de séquence.

* hadoop.job.history.location : l'endroit dans lequel Hadoop va stocker les fichiers historiques traités par les jobtrackers.

Comment définir les fichiers de configuration Hadoop ?
La configuration Hadoop est composée des éléments suivants :
-- configuration
------ property : la propriété de la configuration, composée des éléments suivants :
---------- name : le nom de la propriété.
---------- value : la valeur de la propriété. On peut y utiliser des variables du système (disponibles dans l'appel du System.getProperties().
---------- final : true ou false qui sert à indiquer sur la propriété est finale. Si c'est le cas, elle ne pourra pas être modifiée suite à, par exemple, une nouvelle définition dans un autre fichier de configuration. Les paramètres finaux se trouvent d'habitude dans le fichier core-site.xml.
---------- description : attribut optionnel qui peut servir à déterminer un commentaire lié à une propriété de configuration.

Les propriétés de configuration sont définies dans plusieurs fichiers XML :
- core-site.xml : il stocke la configuration pour le coeur du système Hadoop. On y retrouve les propriétés communes au MapReduce et à l'HDFS.
- hdfs-site.xml : il contient les données liées à l'HDFS (namenode, secondary namenode, datanode)
- mapred-site.xml : ici on retrouve la configuration du MapReduce (jobtracker, tasktracker; généralement les entrées précédées par le préfixe "mapreduce.").

La configuration de l'Hadoop n'est pas à prendre à la légère. Avec la multitude d'options on peut aussi bien rendre les opérations plus performantes que mettre l'application en vrac.
Bartosz KONIECZNY 08-09-2013 16:11 Hadoop
Moi

Développeur d'applications Internet et journaliste passionné par l'adjectif français. Un aigle polonais orienté vers la progression, volant très haut et écoutant du zouk après les matches du foot français.

Vous appréciez mon travail ?

Pour contribuer au développement de ce site, ou pour remercier pour des articles rédigés, vous pouvez faire un don.

Un conseil Symfony2

Comment exécuter son code dans le listener ?

Pour ce faire il suffit de surcharger la méthode handle(Request $request, $type = self::MASTER_REQUEST, $catch = true) qui se trouve dans AppKernel.php .

Une nouvelle méthode handle() peut se présenter ainsi :

function handle(Request $request, $type = self::MASTER_REQUEST, $catch = true)
{
  echo 'handle request';

  return parent::handle($request, $type, $catch);
}