Uso de Spark desde R

Spark
R6Class
Text mining
Big data con Spark y R

El uso de Spark en DataScience es ahora más común por sus grandes beneficios. Algunos son: almacenamiento distribuido, uso de queries como si se estuviera escribiendo en SQL, desarrollo de modelos de machine learning, entre muchos otros.

Rstudio (Posit) ha desarrollado el paquete sparklyr, el cual me parece que es muy completo. Súper recomendado!!

Los ejercicios de este blog provienen del curso de Udemy: “Taming Big Data with Apache Spark and Python - Hands On!” sin embargo se resolvieron con Sparklyr y la conexión se realiza de forma local.

Cargue librerías

library(R6)
library(tidyverse)
library(sparklyr)

Clase R6 spark_conexion.

La clase spark_conexion mantiene toda información y métodos relevantes de la conexión a Spark. Permitiendo reutilizarlas en todo el código.

spark_conexion <- R6::R6Class(
  classname = "conexion",
  public = list(
    initialize = function() {
      return(invisible(self))
    },
    #' @description
    #' Crea nuevo objeto de conexión
    #' @details
    #' Esta función requiere unas variables de ambiente cargadas para poder
    #' functionar.
    connect = function() {
     self$conn_sp <- sparklyr::spark_connect(master = "local")
      return(invisible(self))
    },
    #' @field conn_spark conexion a spack
    conn_sp  = NULL,
    #' @descripcion Método para dplyr::copy_to con conexion a spark
    #' @param tabla_df Data frame
    #' @param tbl_name Nombre de la tabla en spark
    copy_to_sp = function(tabla_df, tbl_name) {
      copy_to(self$conn_sp, tabla_df, tbl_name) 
    },
    #' @descripcion Método para leer tabla de spark
    #' @param tbl_name Nombre de la tabla en spark
    tbl_sp = function(tbl_name) spark_read_table(self$conn_sp, tbl_name)
  )
)

conn <- spark_conexion$new()$connect()

Operaciones básicas con dplyr

Archivo extraído de grouplens.org.

Ventajas de usar spark desde R:

  • Se pueden usar los verbos de dplyr
  • Lazy evaluation: Ver que en la parte superior del resultado aparece: # Source: spark<?> [?? x 2])
sparklyr::spark_read_text(
  conn$conn_sp, 
  name = "movieLens",
  "../../data/u.data"
) %>% 
  separate(line, c("user id","item id","rating","timestamp"), sep = "\t") %>% 
  dplyr::group_by(rating) %>% 
  dplyr::count()
# Source:   SQL [5 x 2]
# Database: spark_connection
# Groups:   rating
  rating     n
  <chr>  <dbl>
1 3      27145
2 1       6110
3 2      11370
4 4      34174
5 5      21201

También permite usar summarise, arrange y operaciones dentro de summarise cómo round, mean.

sp_fake_friends <- sparklyr::spark_read_csv(
  conn$conn_sp,
  name = "fakefriends",
  "../../data/fakefriends.csv",
  header = FALSE,
  columns = c("id", "name", "age", "num_friends")
) %>% 
  dplyr::group_by(age) %>% 
  dplyr::summarise(num_friends = round(mean(num_friends), 1)) %>% 
  dplyr::arrange(age)
sp_fake_friends
# Source:     SQL [?? x 2]
# Database:   spark_connection
# Ordered by: age
   age   num_friends
   <chr>       <dbl>
 1 18           343.
 2 19           213.
 3 20           165 
 4 21           351.
 5 22           206.
 6 23           246.
 7 24           234.
 8 25           198.
 9 26           242.
10 27           228.
# ℹ more rows

La función dplyr::filter puede entrar en conflicto con la funcion sparklyr::filter

sparklyr::spark_read_csv(
  conn$conn_sp,
  name = "fakefriends",
  "../../data/fakefriends.csv",
  header = FALSE,
  columns = c("id", "name", "age", "num_friends")
) %>% 
  dplyr::filter(age == min(age))
# Source:   SQL [8 x 4]
# Database: spark_connection
  id    name    age   num_friends
  <chr> <chr>   <chr> <chr>      
1 106   Beverly 18    499        
2 115   Dukat   18    397        
3 341   Data    18    326        
4 377   Beverly 18    418        
5 404   Kasidy  18    24         
6 439   Data    18    417        
7 444   Keiko   18    472        
8 494   Kasidy  18    194        

Operaciones con texto

Las operaciones con texto también pueden ser usadas mediante verbos o secuencia tidyverse.

  • ft_tokenizer: Esta función permite almacenar las palabras de la fila en una lista.
  • ft_stop_words_remover: Se eliminan las palabras conexión tales como: a, en, entre, o, aquí, aún, con, de, e, y, hay, ...
sparklyr::spark_read_text(
  conn$conn_sp,
  path = "../../data/Book"
) %>% 
  ft_tokenizer(
    input_col = "line",
    output_col = "word_list"
      
  ) %>% 
  ft_stop_words_remover(
    input_col = "word_list",
    output_col = "wo_stop_words"
  ) %>% 
  dplyr::mutate(palabra = explode(wo_stop_words)) %>% 
  dplyr::filter(palabra != "") %>% 
  dplyr::group_by(palabra) %>% 
  dplyr::count() %>% 
  dplyr::filter(palabra != "�") %>% 
  dplyr::arrange(desc(n)) %>% 
  head(10)
# Source:     SQL [10 x 2]
# Database:   spark_connection
# Groups:     palabra
# Ordered by: desc(n)
   palabra      n
   <chr>    <dbl>
 1 business   290
 2 time       168
 3 need       167
 4 new        150
 5 product    128
 6 people     127
 7 get        122
 8 work       120
 9 may        107
10 want       107

Machine Learning

En mi opinión los beneficios que encontré de aplicar ML con sparklyr son:

  • Pipelines: Conjunto de pasos que se desean aplicar al modelo en construcción, es decir, las operaciones a la base, la formula del modelo, seleccion el algoritmo a desarrollar (regresión lineal, árbol de decisión).

  • Algoritmos: Sparklyr usa la librería de ML de Spark, por ende, cuenta con una gran variedad de algoritmos para ser usados.

  • Transformaciones: ft_dplyr_transformer permite aplicar operaciones con dplyr y aplicarlo en el pipeline creado.

Linear Regression

Código
sdf_regresion <- sparklyr::spark_read_text(
  conn$conn_sp,
  path = "../../data/regression.txt"
) %>% 
  separate(line, c("x", "y"), ",") %>% 
  mutate(across(where(is.character), as.numeric))

sdf_regresion
# Source:   SQL [?? x 2]
# Database: spark_connection
       x     y
   <dbl> <dbl>
 1 -1.74  1.66
 2  1.24 -1.18
 3  0.29 -0.4 
 4 -0.13  0.09
 5 -0.39  0.38
 6 -1.79  1.73
 7  0.71 -0.77
 8  1.39 -1.48
 9  1.15 -1.43
10  0.13 -0.07
# ℹ more rows
regresion_pipeline <- sparklyr::ml_pipeline(conn$conn_sp) %>%
  sparklyr::ft_r_formula(y ~ x) %>%
  sparklyr::ml_linear_regression()

partitioned_regresion <- sparklyr::sdf_random_split(
  sdf_regresion,
  training = 0.7,
  testing = 0.3
)

fitted_pipeline <- sparklyr::ml_fit(
  regresion_pipeline,
  partitioned_regresion$training
)

predictions <- sparklyr::ml_transform(
  fitted_pipeline,
  partitioned_regresion$testing
)

predictions
# Source:   table<`sparklyr_tmp_c49e0ccf_7ab1_4b4c_be90_c2838d184847`> [?? x 5]
# Database: spark_connection
       x     y features  label prediction
   <dbl> <dbl> <list>    <dbl>      <dbl>
 1 -3.74  3.75 <dbl [1]>  3.75       3.72
 2 -2.89  2.89 <dbl [1]>  2.89       2.88
 3 -2.58  2.57 <dbl [1]>  2.57       2.57
 4 -2.45  2.44 <dbl [1]>  2.44       2.44
 5 -2.36  2.43 <dbl [1]>  2.43       2.35
 6 -2.29  2.35 <dbl [1]>  2.35       2.28
 7 -2.27  2.19 <dbl [1]>  2.19       2.26
 8 -2.06  1.95 <dbl [1]>  1.95       2.05
 9 -2     2.02 <dbl [1]>  2.02       1.99
10 -1.91  1.83 <dbl [1]>  1.83       1.90
# ℹ more rows