Pyspark.sql.DataFrameWriter.saveAsTable()
Mai întâi, vom vedea cum să scriem PySpark DataFrame existent în tabel folosind funcția write.saveAsTable(). Este nevoie de numele tabelului și de alți parametri opționali, cum ar fi moduri, partionBy etc., pentru a scrie DataFrame în tabel. Se pastreaza ca dosar de parchet.
Sintaxă:
dataframe_obj.write.saveAsTable(cale/nume_tabel,mod,partitionBy,...)
- Table_name este numele tabelului care este creat din dataframe_obj.
- Putem adăuga/suprascrie datele tabelului folosind parametrul mode.
- PartitionBy preia coloanele simple/multiple pentru a crea partiții pe baza valorilor din aceste coloane furnizate.
Exemplul 1:
Creați un PySpark DataFrame cu 5 rânduri și 4 coloane. Scrieți acest cadru de date într-un tabel numit „Agri_Table1”.
import pyspark
din pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# date agricole cu 5 rânduri și 5 coloane
agri =[{ „Soil_Type” : 'Negru' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 2500 , „Starea_solului” : 'Uscat' ,
'Țară' : 'STATELE UNITE ALE AMERICII' },
{ „Soil_Type” : 'Negru' , „Disponibilitate_irigare” : 'Da' , „Acri” : 3500 , „Starea_solului” : 'Umed' ,
'Țară' : 'India' },
{ „Soil_Type” : 'Roșu' , „Disponibilitate_irigare” : 'Da' , „Acri” : 210 , „Starea_solului” : 'Uscat' ,
'Țară' : 'REGATUL UNIT' },
{ „Soil_Type” : 'Alte' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 1000 , „Starea_solului” : 'Umed' ,
'Țară' : 'STATELE UNITE ALE AMERICII' },
{ „Soil_Type” : 'Nisip' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 500 , „Starea_solului” : 'Uscat' ,
'Țară' : 'India' }]
# creați cadrul de date din datele de mai sus
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Scrieți DataFrame de mai sus în tabel.
agri_df.coalesce( 1 ).write.saveAsTable( „Agri_Tabel1” )
Ieșire:
Putem vedea că un fișier parchet este creat cu datele PySpark anterioare.
Exemplul 2:
Luați în considerare DataFrame-ul anterior și scrieți „Agri_Table2” în tabel prin partiționarea înregistrărilor pe baza valorilor din coloana „Țară”.
# Scrieți DataFrame de mai sus în tabel cu parametrul partitionByagri_df.write.saveAsTable( „Agri_Table2” ,partitionBy=[ 'Țară' ])
Ieșire:
Există trei valori unice în coloana „Țară” – „India”, „Marea Britanie” și „SUA”. Deci, sunt create trei partiții. Fiecare compartimentare deține dosarele de parchet.
Pyspark.sql.DataFrameReader.table()
Să încărcăm tabelul în PySpark DataFrame folosind funcția spark.read.table(). Este nevoie de un singur parametru, care este calea/numele tabelului. Încarcă direct tabelul în PySpark DataFrame și toate funcțiile SQL care sunt aplicate PySpark DataFrame pot fi, de asemenea, aplicate pe acest DataFrame încărcat.
Sintaxă:
spark_app.read.table(cale/‘Nume_tabel’)În acest scenariu, folosim tabelul anterior care a fost creat din PySpark DataFrame. Asigurați-vă că trebuie să implementați fragmentele de cod ale scenariului precedent în mediul dvs.
Exemplu:
Încărcați tabelul „Agri_Table1” în DataFrame numit „loaded_data”.
loaded_data = linuxhint_spark_app.read.table( „Agri_Table1” )loaded_data.show()
Ieșire:
Putem vedea că tabelul este încărcat în PySpark DataFrame.
Executarea interogărilor SQL
Acum, executăm câteva interogări SQL pe DataFrame încărcat folosind funcția spark.sql().
# Utilizați comanda SELECT pentru a afișa toate coloanele din tabelul de mai sus.linuxhint_spark_app.sql( „SELECT * din Agri_Table1” ).spectacol()
# WHERE clauza
linuxhint_spark_app.sql( 'SELECT * din Agri_Table1 WHERE Soil_status='Dry' ' ).spectacol()
linuxhint_spark_app.sql( „SELECT * din Agri_Table1 WHERE Acres > 2000” ).spectacol()
Ieșire:
- Prima interogare afișează toate coloanele și înregistrările din DataFrame.
- A doua interogare afișează înregistrările bazate pe coloana „Soil_status”. Există doar trei înregistrări cu elementul „Uscat”.
- Ultima interogare returnează două înregistrări cu „Acri” care sunt mai mari de 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Folosind funcția insertInto(), putem adăuga DataFrame în tabelul existent. Putem folosi această funcție împreună cu selectExpr() pentru a defini numele coloanelor și apoi a o introduce în tabel. Această funcție ia, de asemenea, tableName ca parametru.
Sintaxă:
DataFrame_obj.write.insertInto(’Nume_tabel’)În acest scenariu, folosim tabelul anterior care a fost creat din PySpark DataFrame. Asigurați-vă că trebuie să implementați fragmentele de cod ale scenariului precedent în mediul dvs.
Exemplu:
Creați un nou DataFrame cu două înregistrări și introduceți-le în tabelul „Agri_Table1”.
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# date agricole cu 2 rânduri
agri =[{ „Soil_Type” : 'Nisip' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 2500 , „Starea_solului” : 'Uscat' ,
'Țară' : 'STATELE UNITE ALE AMERICII' },
{ „Soil_Type” : 'Nisip' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 1200 , „Starea_solului” : 'Umed' ,
'Țară' : 'Japonia' }]
# creați cadrul de date din datele de mai sus
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Acri' , 'Țară' , „Disponibilitate_irigare” , „Soil_Type” ,
„Starea_solului” ).write.insertInto( „Agri_Tabel1” )
# Afișează Agri_Table1 final
linuxhint_spark_app.sql( „SELECT * din Agri_Table1” ).spectacol()
Ieșire:
Acum, numărul total de rânduri care sunt prezente în DataFrame este 7.
Concluzie
Acum înțelegeți cum să scrieți PySpark DataFrame în tabel folosind funcția write.saveAsTable(). Ia numele tabelului și alți parametri opționali. Apoi, am încărcat acest tabel în PySpark DataFrame folosind funcția spark.read.table(). Este nevoie de un singur parametru, care este calea/numele tabelului. Dacă doriți să adăugați noul DataFrame în tabelul existent, utilizați funcția insertInto().