Cum să citiți și să scrieți datele unui tabel în PySpark

Cum Sa Cititi Si Sa Scrieti Datele Unui Tabel In Pyspark



Procesarea datelor în PySpark este mai rapidă dacă datele sunt încărcate sub formă de tabel. Cu aceasta, folosind expresiile SQL, procesarea va fi rapidă. Deci, conversia PySpark DataFrame/RDD într-un tabel înainte de a-l trimite pentru procesare este cea mai bună abordare. Astăzi, vom vedea cum să citim datele din tabel în PySpark DataFrame, să scriem PySpark DataFrame în tabel și să inserăm un nou DataFrame în tabelul existent utilizând funcțiile încorporate. Să mergem!

Pyspark.sql.DataFrameWriter.saveAsTable()

Mai întâi, vom vedea cum să scriem PySpark DataFrame existent în tabel folosind funcția write.saveAsTable(). Este nevoie de numele tabelului și de alți parametri opționali, cum ar fi moduri, partionBy etc., pentru a scrie DataFrame în tabel. Se pastreaza ca dosar de parchet.

Sintaxă:







dataframe_obj.write.saveAsTable(cale/nume_tabel,mod,partitionBy,...)
  1. Table_name este numele tabelului care este creat din dataframe_obj.
  2. Putem adăuga/suprascrie datele tabelului folosind parametrul mode.
  3. PartitionBy preia coloanele simple/multiple pentru a crea partiții pe baza valorilor din aceste coloane furnizate.

Exemplul 1:

Creați un PySpark DataFrame cu 5 rânduri și 4 coloane. Scrieți acest cadru de date într-un tabel numit „Agri_Table1”.



import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# date agricole cu 5 rânduri și 5 coloane

agri =[{ „Soil_Type” : 'Negru' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 2500 , „Starea_solului” : 'Uscat' ,
'Țară' : 'STATELE UNITE ALE AMERICII' },

{ „Soil_Type” : 'Negru' , „Disponibilitate_irigare” : 'Da' , „Acri” : 3500 , „Starea_solului” : 'Umed' ,
'Țară' : 'India' },

{ „Soil_Type” : 'Roșu' , „Disponibilitate_irigare” : 'Da' , „Acri” : 210 , „Starea_solului” : 'Uscat' ,
'Țară' : 'REGATUL UNIT' },

{ „Soil_Type” : 'Alte' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 1000 , „Starea_solului” : 'Umed' ,
'Țară' : 'STATELE UNITE ALE AMERICII' },

{ „Soil_Type” : 'Nisip' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 500 , „Starea_solului” : 'Uscat' ,
'Țară' : 'India' }]



# creați cadrul de date din datele de mai sus

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Scrieți DataFrame de mai sus în tabel.

agri_df.coalesce( 1 ).write.saveAsTable( „Agri_Tabel1” )

Ieșire:







Putem vedea că un fișier parchet este creat cu datele PySpark anterioare.



Exemplul 2:

Luați în considerare DataFrame-ul anterior și scrieți „Agri_Table2” în tabel prin partiționarea înregistrărilor pe baza valorilor din coloana „Țară”.

# Scrieți DataFrame de mai sus în tabel cu parametrul partitionBy

agri_df.write.saveAsTable( „Agri_Table2” ,partitionBy=[ 'Țară' ])

Ieșire:

Există trei valori unice în coloana „Țară” – „India”, „Marea Britanie” și „SUA”. Deci, sunt create trei partiții. Fiecare compartimentare deține dosarele de parchet.

Pyspark.sql.DataFrameReader.table()

Să încărcăm tabelul în PySpark DataFrame folosind funcția spark.read.table(). Este nevoie de un singur parametru, care este calea/numele tabelului. Încarcă direct tabelul în PySpark DataFrame și toate funcțiile SQL care sunt aplicate PySpark DataFrame pot fi, de asemenea, aplicate pe acest DataFrame încărcat.

Sintaxă:

spark_app.read.table(cale/‘Nume_tabel’)

În acest scenariu, folosim tabelul anterior care a fost creat din PySpark DataFrame. Asigurați-vă că trebuie să implementați fragmentele de cod ale scenariului precedent în mediul dvs.

Exemplu:

Încărcați tabelul „Agri_Table1” în DataFrame numit „loaded_data”.

loaded_data = linuxhint_spark_app.read.table( „Agri_Table1” )

loaded_data.show()

Ieșire:

Putem vedea că tabelul este încărcat în PySpark DataFrame.

Executarea interogărilor SQL

Acum, executăm câteva interogări SQL pe DataFrame încărcat folosind funcția spark.sql().

# Utilizați comanda SELECT pentru a afișa toate coloanele din tabelul de mai sus.

linuxhint_spark_app.sql( „SELECT * din Agri_Table1” ).spectacol()

# WHERE clauza

linuxhint_spark_app.sql( 'SELECT * din Agri_Table1 WHERE Soil_status='Dry' ' ).spectacol()

linuxhint_spark_app.sql( „SELECT * din Agri_Table1 WHERE Acres > 2000” ).spectacol()

Ieșire:

  1. Prima interogare afișează toate coloanele și înregistrările din DataFrame.
  2. A doua interogare afișează înregistrările bazate pe coloana „Soil_status”. Există doar trei înregistrări cu elementul „Uscat”.
  3. Ultima interogare returnează două înregistrări cu „Acri” care sunt mai mari de 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Folosind funcția insertInto(), putem adăuga DataFrame în tabelul existent. Putem folosi această funcție împreună cu selectExpr() pentru a defini numele coloanelor și apoi a o introduce în tabel. Această funcție ia, de asemenea, tableName ca parametru.

Sintaxă:

DataFrame_obj.write.insertInto(’Nume_tabel’)

În acest scenariu, folosim tabelul anterior care a fost creat din PySpark DataFrame. Asigurați-vă că trebuie să implementați fragmentele de cod ale scenariului precedent în mediul dvs.

Exemplu:

Creați un nou DataFrame cu două înregistrări și introduceți-le în tabelul „Agri_Table1”.

import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# date agricole cu 2 rânduri

agri =[{ „Soil_Type” : 'Nisip' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 2500 , „Starea_solului” : 'Uscat' ,
'Țară' : 'STATELE UNITE ALE AMERICII' },

{ „Soil_Type” : 'Nisip' , „Disponibilitate_irigare” : 'Nu' , „Acri” : 1200 , „Starea_solului” : 'Umed' ,
'Țară' : 'Japonia' }]

# creați cadrul de date din datele de mai sus

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Acri' , 'Țară' , „Disponibilitate_irigare” , „Soil_Type” ,
„Starea_solului” ).write.insertInto( „Agri_Tabel1” )

# Afișează Agri_Table1 final

linuxhint_spark_app.sql( „SELECT * din Agri_Table1” ).spectacol()

Ieșire:

Acum, numărul total de rânduri care sunt prezente în DataFrame este 7.

Concluzie

Acum înțelegeți cum să scrieți PySpark DataFrame în tabel folosind funcția write.saveAsTable(). Ia numele tabelului și alți parametri opționali. Apoi, am încărcat acest tabel în PySpark DataFrame folosind funcția spark.read.table(). Este nevoie de un singur parametru, care este calea/numele tabelului. Dacă doriți să adăugați noul DataFrame în tabelul existent, utilizați funcția insertInto().