PySpark Read.Parquet()

Pyspark Read Parquet



În PySpark, funcția write.parquet() scrie DataFrame în fișierul parchet, iar read.parquet() citește fișierul parchet în PySpark DataFrame sau orice altă sursă de date. Pentru a procesa coloanele în Apache Spark rapid și eficient, trebuie să comprimăm datele. Comprimarea datelor ne salvează memoria și toate coloanele sunt convertite în nivel plat. Asta înseamnă că spațiul de stocare plat la nivel de coloană există. Fișierul care le stochează este cunoscut sub numele de fișier PARQUET.

În acest ghid, ne vom concentra în principal pe citirea/încărcarea fișierului parchet în PySpark DataFrame/SQL folosind funcția read.parquet() care este disponibilă în clasa pyspark.sql.DataFrameReader.

Subiect de conținut:







Obțineți fișierul parchet



Citiți fișierul Parquet în PySpark DataFrame



Citiți fișierul Parquet la PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Această funcție este folosită pentru a citi fișierul parchet și a-l încărca în PySpark DataFrame. Preia calea/numele fișierului de parchet. Putem folosi pur și simplu funcția read.parquet() deoarece aceasta este funcția generică.

Sintaxă:



Să vedem sintaxa read.parquet():

spark_app.read.parquet(nume_fișier.parquet/cale)

Mai întâi, instalați modulul PySpark folosind comanda pip:

pip install pyspark

Obțineți fișierul parchet

Pentru a citi un fișier de parchet, aveți nevoie de datele în care este generat fișierul de parchet din acele date. În această parte, vom vedea cum să generăm un fișier de parchet din PySpark DataFrame.

Să creăm un PySpark DataFrame cu 5 înregistrări și să scriem acest lucru în fișierul de parchet „industry_parquet”.

import pyspark

din pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# creați cadrul de date care stochează detaliile industriei

industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Agricultură' ,Zona= 'STATELE UNITE ALE AMERICII' ,
Evaluare= 'Fierbinte' ,Total_angajați= 100 ),

Rând(Tip= 'Agricultură' ,Zona= 'India' ,Evaluare= 'Fierbinte' ,Total_angajați= 200 ),

Rând(Tip= 'Dezvoltare' ,Zona= 'STATELE UNITE ALE AMERICII' ,Evaluare= 'Cald' ,Total_angajați= 100 ),

Rând(Tip= 'Educaţie' ,Zona= 'STATELE UNITE ALE AMERICII' ,Evaluare= 'Misto' ,Total_angajați= 400 ),

Rând(Tip= 'Educaţie' ,Zona= 'STATELE UNITE ALE AMERICII' ,Evaluare= 'Cald' ,Total_angajați= douăzeci )

])

# Cadrul de date real

industria_df.show()

# Scrie industria_df în fișierul parchet

industria_df.coalesce( 1 ).scrie.parchet( „parchet_industrie” )

Ieșire:

Acesta este DataFrame care deține 5 înregistrări.

Se creează un fișier de parchet pentru DataFrame anterior. Aici, numele nostru de fișier cu o extensie este „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet”. Folosim acest fișier în întregul tutorial.

Citiți fișierul Parquet în PySpark DataFrame

Avem dosarul de parchet. Să citim acest fișier folosind funcția read.parquet() și să îl încărcăm în PySpark DataFrame.

import pyspark

din pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# Citiți fișierul parchet în obiectul dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet” )

# Afișează dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Ieșire:

Afișăm DataFrame folosind metoda show() care a fost creată din fișierul parchet.

Interogări SQL cu fișier Parquet

După încărcarea în DataFrame, poate fi posibilă crearea tabelelor SQL și afișarea datelor care sunt prezente în DataFrame. Trebuie să creăm o vizualizare TEMPORARĂ și să folosim comenzile SQL pentru a returna înregistrările din DataFrame care este creat din fișierul parchet.

Exemplul 1:

Creați o vizualizare temporară numită „Sectoare” și utilizați comanda SELECT pentru a afișa înregistrările în DataFrame. Vă puteți referi la asta tutorial care explică cum să creați o vizualizare în Spark – SQL.

import pyspark

din pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# Citiți fișierul parchet în obiectul dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet” )

# Creați vizualizare din fișierul de parchet de mai sus numit - „Sectoare”

dataframe_from_parquet.createOrReplaceTempView( „Sectoare” )

# Interogare pentru a afișa toate înregistrările din sectoare

linuxhint_spark_app.sql( „selectați * din sectoare” ).spectacol()

Ieșire:

Exemplul 2:

Folosind VIEW anterioară, scrieți interogarea SQL:

  1. Pentru a afișa toate înregistrările din sectoarele care aparțin „India”.
  2. Pentru a afișa toate înregistrările din sectoarele cu un angajat care este mai mare de 100.
# Interogare pentru a afișa toate înregistrările din sectoarele aparținând „India”.

linuxhint_spark_app.sql( 'selectați * din sectoarele unde Area='India'' ).spectacol()

# Interogare pentru a afișa toate înregistrările din sectoarele cu angajați mai mari de 100

linuxhint_spark_app.sql( „selectați * din sectoarele în care Total_angajați>100” ).spectacol()

Ieșire:

Există o singură înregistrare cu zonă care este „India” și două înregistrări cu angajați care depășesc 100.

Citiți fișierul Parquet la PySpark SQL

Mai întâi, trebuie să creăm o vizualizare folosind comanda CREATE. Folosind cuvântul cheie „cale” din interogarea SQL, putem citi fișierul parchet către Spark SQL. După cale, trebuie să specificăm numele/locația fișierului.

Sintaxă:

spark_app.sql( „CREATE TEMPORARY VIEW view_name FOLOSIND OPȚIUNI DE parchet (cale „ nume_fișier.parchet ')' )

Exemplul 1:

Creați o vizualizare temporară numită „Sector2” și citiți fișierul parchet în ea. Folosind funcția sql(), scrieți interogarea select pentru a afișa toate înregistrările care sunt prezente în vizualizare.

import pyspark

din pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# Citiți fișierul parchet în Spark-SQL

linuxhint_spark_app.sql( „CREAȚI VEDERE TEMPORARĂ Sector2 FOLOSIND OPȚIUNI DE parchet (cale „ parte-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Interogare pentru a afișa toate înregistrările din Sector2

linuxhint_spark_app.sql( „selectați * din Sector2” ).spectacol()

Ieșire:

Exemplul 2:

Utilizați vizualizarea anterioară și scrieți interogarea pentru a afișa toate înregistrările cu ratingul „Ferbinte” sau „Rece”.

# Interogare pentru a afișa toate înregistrările din Sector2 cu Rating- Hot sau Cool.

linuxhint_spark_app.sql( 'selectați * din Sector2 unde Rating='Hot' SAU Rating='Cool'' ).spectacol()

Ieșire:

Există trei înregistrări cu calificativul „Fierbinte” sau „Rece”.

Concluzie

În PySpark, funcția write.parquet() scrie DataFrame în fișierul parchet. Funcția read.parquet() citește fișierul parchet în PySpark DataFrame sau în orice altă sursă de date. Am învățat cum să citim fișierul parchet în PySpark DataFrame și în tabelul PySpark. Ca parte a acestui tutorial, am discutat și despre cum să creați tabelele din PySpark DataFrame și să filtrați datele folosind clauza WHERE.