În acest ghid, ne vom concentra în principal pe citirea/încărcarea fișierului parchet în PySpark DataFrame/SQL folosind funcția read.parquet() care este disponibilă în clasa pyspark.sql.DataFrameReader.
Subiect de conținut:
Citiți fișierul Parquet în PySpark DataFrame
Citiți fișierul Parquet la PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
Această funcție este folosită pentru a citi fișierul parchet și a-l încărca în PySpark DataFrame. Preia calea/numele fișierului de parchet. Putem folosi pur și simplu funcția read.parquet() deoarece aceasta este funcția generică.
Sintaxă:
Să vedem sintaxa read.parquet():
spark_app.read.parquet(nume_fișier.parquet/cale)Mai întâi, instalați modulul PySpark folosind comanda pip:
pip install pyspark
Obțineți fișierul parchet
Pentru a citi un fișier de parchet, aveți nevoie de datele în care este generat fișierul de parchet din acele date. În această parte, vom vedea cum să generăm un fișier de parchet din PySpark DataFrame.
Să creăm un PySpark DataFrame cu 5 înregistrări și să scriem acest lucru în fișierul de parchet „industry_parquet”.
import pysparkdin pyspark.sql import SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# creați cadrul de date care stochează detaliile industriei
industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Agricultură' ,Zona= 'STATELE UNITE ALE AMERICII' ,
Evaluare= 'Fierbinte' ,Total_angajați= 100 ),
Rând(Tip= 'Agricultură' ,Zona= 'India' ,Evaluare= 'Fierbinte' ,Total_angajați= 200 ),
Rând(Tip= 'Dezvoltare' ,Zona= 'STATELE UNITE ALE AMERICII' ,Evaluare= 'Cald' ,Total_angajați= 100 ),
Rând(Tip= 'Educaţie' ,Zona= 'STATELE UNITE ALE AMERICII' ,Evaluare= 'Misto' ,Total_angajați= 400 ),
Rând(Tip= 'Educaţie' ,Zona= 'STATELE UNITE ALE AMERICII' ,Evaluare= 'Cald' ,Total_angajați= douăzeci )
])
# Cadrul de date real
industria_df.show()
# Scrie industria_df în fișierul parchet
industria_df.coalesce( 1 ).scrie.parchet( „parchet_industrie” )
Ieșire:
Acesta este DataFrame care deține 5 înregistrări.
Se creează un fișier de parchet pentru DataFrame anterior. Aici, numele nostru de fișier cu o extensie este „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet”. Folosim acest fișier în întregul tutorial.
Citiți fișierul Parquet în PySpark DataFrame
Avem dosarul de parchet. Să citim acest fișier folosind funcția read.parquet() și să îl încărcăm în PySpark DataFrame.
import pysparkdin pyspark.sql import SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# Citiți fișierul parchet în obiectul dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet” )
# Afișează dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Ieșire:
Afișăm DataFrame folosind metoda show() care a fost creată din fișierul parchet.
Interogări SQL cu fișier Parquet
După încărcarea în DataFrame, poate fi posibilă crearea tabelelor SQL și afișarea datelor care sunt prezente în DataFrame. Trebuie să creăm o vizualizare TEMPORARĂ și să folosim comenzile SQL pentru a returna înregistrările din DataFrame care este creat din fișierul parchet.
Exemplul 1:
Creați o vizualizare temporară numită „Sectoare” și utilizați comanda SELECT pentru a afișa înregistrările în DataFrame. Vă puteți referi la asta tutorial care explică cum să creați o vizualizare în Spark – SQL.
import pysparkdin pyspark.sql import SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# Citiți fișierul parchet în obiectul dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet” )
# Creați vizualizare din fișierul de parchet de mai sus numit - „Sectoare”
dataframe_from_parquet.createOrReplaceTempView( „Sectoare” )
# Interogare pentru a afișa toate înregistrările din sectoare
linuxhint_spark_app.sql( „selectați * din sectoare” ).spectacol()
Ieșire:
Exemplul 2:
Folosind VIEW anterioară, scrieți interogarea SQL:
- Pentru a afișa toate înregistrările din sectoarele care aparțin „India”.
- Pentru a afișa toate înregistrările din sectoarele cu un angajat care este mai mare de 100.
linuxhint_spark_app.sql( 'selectați * din sectoarele unde Area='India'' ).spectacol()
# Interogare pentru a afișa toate înregistrările din sectoarele cu angajați mai mari de 100
linuxhint_spark_app.sql( „selectați * din sectoarele în care Total_angajați>100” ).spectacol()
Ieșire:
Există o singură înregistrare cu zonă care este „India” și două înregistrări cu angajați care depășesc 100.
Citiți fișierul Parquet la PySpark SQL
Mai întâi, trebuie să creăm o vizualizare folosind comanda CREATE. Folosind cuvântul cheie „cale” din interogarea SQL, putem citi fișierul parchet către Spark SQL. După cale, trebuie să specificăm numele/locația fișierului.
Sintaxă:
spark_app.sql( „CREATE TEMPORARY VIEW view_name FOLOSIND OPȚIUNI DE parchet (cale „ nume_fișier.parchet ')' )Exemplul 1:
Creați o vizualizare temporară numită „Sector2” și citiți fișierul parchet în ea. Folosind funcția sql(), scrieți interogarea select pentru a afișa toate înregistrările care sunt prezente în vizualizare.
import pysparkdin pyspark.sql import SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# Citiți fișierul parchet în Spark-SQL
linuxhint_spark_app.sql( „CREAȚI VEDERE TEMPORARĂ Sector2 FOLOSIND OPȚIUNI DE parchet (cale „ parte-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# Interogare pentru a afișa toate înregistrările din Sector2
linuxhint_spark_app.sql( „selectați * din Sector2” ).spectacol()
Ieșire:
Exemplul 2:
Utilizați vizualizarea anterioară și scrieți interogarea pentru a afișa toate înregistrările cu ratingul „Ferbinte” sau „Rece”.
# Interogare pentru a afișa toate înregistrările din Sector2 cu Rating- Hot sau Cool.linuxhint_spark_app.sql( 'selectați * din Sector2 unde Rating='Hot' SAU Rating='Cool'' ).spectacol()
Ieșire:
Există trei înregistrări cu calificativul „Fierbinte” sau „Rece”.
Concluzie
În PySpark, funcția write.parquet() scrie DataFrame în fișierul parchet. Funcția read.parquet() citește fișierul parchet în PySpark DataFrame sau în orice altă sursă de date. Am învățat cum să citim fișierul parchet în PySpark DataFrame și în tabelul PySpark. Ca parte a acestui tutorial, am discutat și despre cum să creați tabelele din PySpark DataFrame și să filtrați datele folosind clauza WHERE.