Ejemplo de uso de PySpark en Linux y algunos comandos básicos de transformación/acción en Spark

Apache Spark está escrito en lenguaje de programación Scala. Para admitir la programación Python con Spark, la comunidad Apache Spark lanzó la herramienta PySpark. Esta herramienta interactiva puede trabajar con RDD (los datasets distribuidos de Spark) en el lenguaje de programación Python.

Quizás te interese seguir antes el siguiente post: Instalación paso a paso de Spark en Linux y ejecución de PySpark.

En este post veremos un ejemplo de uso a través de la consola de Linux. Usaremos la consola de la distribución CentOS aunque es equivalente para Ubuntu o Mint.

Como primer paso, desde la consola de Linux vamos a crear una carpeta llamada “libros” y vamos a descargar en ella, en texto plano, la novela de terror cósmico “El horror de Dunwich” (1928) de H.P. Lovecraft:

cd
mkdir libros
cd libros
curl https://www.gutenberg.org/files/50133/50133-8.txt > dunwich.txt

Ahora vamos a entrar en PySpark para analizarlo. Tecleamos en la consola de Linux:

pyspark

textFile

Cargamos en libro en un RDD con la función textFile, que cargará el archivo dunwich.txt tomando cada línea como si fuera un registro:

miRDD = sc.textFile("file:///home/mi_usuario/libros")

El comando anterior cargará todos los ficheros que haya en la carpeta “libros”. En nuestro caso solo debe estar el recién descargado dunwich.txt. Su tuviéramos más libros en la carpeta y quisiéramos tenerlos separados, podríamos usar wholeTextFile. No olvides sustituir en el comando anterior “mi_usuario” por el nombre real de tu usuario en la máquina Linux.

Sample

Ahora podemos obtener una muestra de los datos, un subconjunto de nuestro RDD:

miRDD.takeSample(False, 20)

Con el comando anterior estoy indicando que me guarde una muestra de 20 datos aleatorios sin repeticiones en salidaDriver. Si en lugar de “20” ponemos un número entre 0 y 1 (por ejemplo 0.6), nos haría una selección aleatoria de datos siguiendo una distribución de Bernoulli con la probabilidad indicada. Para indicar una semilla a la generación aleatoria añadimos un tercer término, por ejemplo: miRDD.takeSample(False, 0.6, 1).

Si tecleamos salidaDriver en la consola deben salirnos las 20 líneas aleatorias de El Horror de Dunwich en pantalla.

takeSample

A diferencia de sample, que coge una muestra, que puede ser de gran tamaño, y la mantiene en el clúster, takeSample toma una muestra -generalmente pequeña- y la trae a nuestro ordenador para que la analicemos. Igual que en el caso anterior, False indica que sea una muestra sin reposición:

miRDD.takeSample(False, 5)

Collect

La acción de collect devuelve un array con el RDD completo. Por tanto, puedo visualizar el registro completo (todo el libro) con el siguiente comando:

miRDD.collect()

Count

Devuelve al driver el número de elementos del RDD. Así que podemos ver el número de líneas de mi libro de la siguiente forma:

miRDD.count()

First

Devuelve el primer elemento del RDD. En nuestro caso es el título del libro “The Dunwich Horror“:

miRDD.first()

Take

Devuelve los n primeros elementos de mi RDD:

miRDD.take(10)

getNumPartitions

Podemos ver el número de particiones usadas:

miRDD.getNumPartitions()

Que será solo de una dado el poco volumen de datos con el que estamos trabajando en este ejemplo. Si tuviéramos varias particiones y estuviéramos en un clúster lo normal es que dichas particiones estén repartidas por diversos clústeres.

Glom

Podemos ver los datos en cada partición (para pruebas con pocos datos) con el siguiente comando:

miRDD.glom().collect()

Caché / Persist

Si vamos a trabajar mucho sobre un mismo RDD, es conveniente moverlo a memoria para evitar que Spark esté generándolo en cada acción que hagamos. Podemos hacerlo con este código:

miRDD.cache()

Existe otra función, llamada persist, que nos permite configurar en detalle los cacheos. Por ejemplo, quiero cargarlo el RDD en memoria pero lo que no cabe en memoria lo pase a disco (memoria+disco). O quizás lo que me interesa que lo guarde en memoria pero con varias replicaciones. Son opciones que la función persist permite hacer.

Map

Vamos ahora a crear un nuevo RDD modificando el original. Para ello usamos la función map que por cada registro emite otro registro transformado según una función.

Para probarlo, vamos a crear un nuevo RDD que contendrá el número de caracteres de cada registro (de cada fila del texto) del libro:

nuevoRDD = miRDD.map(lambda record: len(record)

Hemos usado una función lambda de Python para reducir el código. El resultado se puede ver así:

nuevoRDD.collect()

Filter

Ahora vamos a probar la función filter, que filtra decidiendo si un registro se emite o no.

En primer lugar creamos una función para crear las condiciones de filtros que deseamos. En nuestro caso queremos obtener exclusivamente aquellos registros (frases del libro) que contengan más de 71 caracteres.

def miFuncion(record):
    if len(record) > 71:
       return True
    else:
       return False

Cómo vimos en el map anterior la mayoría de los registros tiene 70/71 caracteres, por lo que con esta función vamos a obtener pocos valores. Ejecutamos el filter:

nuevoRDD = miRDD.filter(miFuncion)

Y comprobamos el resultado obtenido:

nuevoRDD.collect()

Por lo que sólo un registro de todo el libro alcanza más de 71 caracteres:

“_Ygnaiih … ygnaiih … thflthkh’ngha … Yog-Sothoth…._” rang the hideous croaking out of space. “_Y’bthnk … h’ehye … n’grkdl’lh…._”

El horror de Dunwich” – H.P. Lovecraft.
Para saber más:
RDD Programming Guide.
Apache Spark para principiantes.
Comandos básicos PySpark.

Deja una respuesta