Creación y ejecución de un programa Python para Hadoop Map Reduce en Linux

Vamos a ejecutar un sencillo programa Python en Hadoop Map Reduce. El programa va a calcular la temperatura máxima de cada año a partir de un registro histórico. Para el ejemplo usaremos CentOS aunque es válido para cualquier otra distribución de Linux.


Si no tienes aún instalado Hadoop quizás te interese el siguiente post: Instalación paso a paso de Hadoop en Linux y un ejemplo de uso.

En primer lugar crearemos una carpeta tempMax en el escritorio que usaremos como directorio de trabajo:

Nos ubicamos desde la terminal dentro de esa carpeta:

cd Escritorio
cd tempMax

Creamos el archivo de Python donde vamos a programar nuestro código mapper:

touch mapperMaxTemp.py

Antes de escribir el código mapper, debemos tener en cuenta que nuestros datos estarán representados de la siguiente forma:

Es decir, que cada fila tendrá representado el año, el mes y la temperatura con espacios tabulados (datos ficticios, generados con una función random). Así que cada subproblema será un año. Tendremos que emitir pares clave-valor dónde la clave sea el año y el valor la temperatura.

Una vez creado el archivo mapperMaxTemp.py, accedemos a él desde el escritorio con doble click y escribimos el código mapper:

#!/usr/bin/python
import sys

"""
Mapper de MaxTemp
Obtenido de http://exponentis.es/
"""

# Por cada medida calculamos los pares <anyo, temp>
for linea in sys.stdin:
  linea = linea.strip()
  anyo, mes, temp = linea.split("\t", 2)
  print("%s\t%s" % (anyo, temp))

El código, para cada línea de datos de entrada, en primer lugar elimina espacios en blanco (por delante y por detrás) con el método strip() y posteriormente extrae el año, mes y temperatura de cada fila, “rompiendo” la entrada por cada tabulación (/t) que haya. Por último emitimos con el print la clave-valor separados por una tabulación.

Guardamos el archivo y volviendo a la terminal de Linux, nos damos permisos para poder ejecutar el mapper.

chmod u+x mapperMaxTemp.py

Ahora vamos a crear la funcionalidad reducer:

touch reducerMaxTemp.py

Abrimos el archivo recién creado reducerMaxTemp.py con doble click en su carpeta del escritorio. Ahora tenemos que escribir un código que calcule el máximo de las temperaturas recibidas:

#!/usr/bin/python
import sys

"""
Reducer de MaxTemp
Obtenido de http://exponentis.es/
"""

subproblema = None
tempMaxima = None

for claveValor in sys.stdin:
	anyo, temp = claveValor.split("\t", 1)
	
	#convertimos la temp a float
	temp = float(temp)

	#El primer subproblema es el primer anyo de reducer (y la temp máxima de momento también) 	
	if subproblema == None:
		subproblema = anyo
		tempMaxima = temp

	#si el anyo es del subproblema actual, comprobamos si es la temperatura maxima
	if subproblema == anyo:
		if temp > tempMaxima:
			tempMaxima = temp
	else: #si ya acabamos con el subproblema, emitimos    
		print("%s\t%s" % (subproblema, tempMaxima))
        
		#Pasamos al siguiente subproblema (de momento la temp es la máxima)
		subproblema = anyo
		tempMaxima = temp

#el anterior bucle no emite el último subproblema    
print("%s\t%s" % (subproblema, tempMaxima))

El programa crear muchos pares clave-valor y tenemos que indicar cuando termina cada par. Separamos año y temperatura actuales. En el primer subproblema consideramos máxima la primera temperatura y la comparamos con la del siguiente subproblema, comparando la temperatura actual con la temperatura máxima almacenada. Si es superior la temperatura actual se actualiza la temperatura almacenada.

Esto se realiza para todas las temperaturas hasta que cambia la fecha del año, en el que cambiamos de subproblema, emitiendo la solución del anterior subproblema (año y temperatura máxima).

El último print emite la solución del último subproblema.

Al igual que con el mapper, debemos darnos permisos de ejecución sobre el reducer a través de la consola de Linux:

chmod u+x reducerMaxTemp.py

Ahora descargaremos el fichero medidas.txt en nuestra carpeta de trabajo. Este fichero contiene 730 registros diarios de temperaturas a lo largo de 2017 y 2018, y tiene el formato que vimos anteriormente con el año, mes y temperatura con espacios tabulados. Nota: son datos inventados, con una temperatura máxima diaria establecida con una función random entre -5 ºC y 48 ºC.

Descarga el archivo medidas.txt de aquí: https://mega.nz/#!Pnpw3aYK

Vamos a ejecutar el mapper desde la consola Linux para comprobar que está bien codificado. Nota: la barra vertical es escribe con “Alt Gr + 1”.

cat medidas.txt | ./mapperMaxTemp.py

El resultado debe ser mostrar en pantalla todas las clave-valor que emite mapper, es decir, todas las temperaturas de cada año sin el mes:

Ahora ejecutamos el Map y el Reduce a la vez sin ejecutar Hadoop a modo de prueba:

cat medidas.txt | ./mapperMaxTemp.py | sort -k1,1 | ./reducerMaxTemp.py

El resultado es el máximo de cada año, que coinciden de forma lógica por haberse usado la misma función random:

Viendo que ya funciona bien, podemos ejecutarlo en Hadoop de la siguiente forma:

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar -files ./mapperMaxTemp.py -mapper ./mapperMaxTemp.py -file ./reducerMaxTemp.py -reducer ./reducerMaxTemp.py -input medidas.txt -output ./miSalidaMaxTemp1

Con este comando indicamos a Hadoop cuáles son nuestros archivos mapper y reducer y que además tendrá que distribuir por los distintos servidores (por eso salen indicados dos veces). Le indicamos además cuáles son los datos –medidas.txt- y la salida que queremos.

Se nos habrá creado una nueva carpeta en nuestro directorio llamada miSalidaMexTemp1 que contendrá un archivo llamado part-00000 con el resultado del análisis:

Con esto ya habríamos terminado. Pero supongamos que ahora queremos obtener también el máximo de temperatura por cada mes.

Modificamos el mapper añadiendo el mes:

Hemos separado año y mes con un guión y posteriormente la temperatura con una tabulación.

El comando Hadoop seria muy similar al anterior pero debemos indicarle, además de lo anterior, que nuestro mapper va con una clave compuesta (dos valores) y que además van separados por un guión. Debemos además especificar una salida diferente parque Hadoop no sobrescribe la anterior y da un error:

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar -Dstream.num.map.key.fields=2 -Dmap.output.key.field.separator="-" -files ./mapperMaxTemp.py -mapper ./mapperMaxTemp.py -file ./reducerMaxTemp.py -reducer ./reducerMaxTemp.py -input medidas.txt -output ./miSalidaMaxTemp2

El resultado en el nuevo archivo part-00000 es la máxima temperatura por mes:

Nota: si quisiéramos añadir un combiner, podría hacerse de la siguiente forma:

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar -Dstream.num.map.key.fields=2 -Dmap.output.key.field.separator="-" -files ./mapperMaxTemp.py -mapper ./mapperMaxTemp.py -file ./reducerMaxTemp.py -reducer ./reducerMaxTemp.py -combiner ./reducerMaxTemp.py -input medidas.txt -output ./miSalidaMaxTemp2
Para saber más:
Hadoop – Ejemplos de hadoop-mapreduce-examples.jar
Ejecución de los ejemplos de MapReduce incluidos en HDInsight
Apache Hadoop-MapReduce

3 comentarios en “Creación y ejecución de un programa Python para Hadoop Map Reduce en Linux”

Responder a Gliese710 Cancelar la respuesta