Subiect de conținut:
- PySpark DataFrame în CSV prin conversia în Pandas DataFrame
- PySpark Pandas DataFrame în CSV folosind metoda To_Csv().
- PySpark Pandas DataFrame în CSV prin conversia în NumPy Array
- PySpark DataFrame la CSV folosind metoda Write.Csv().
Dacă doriți să aflați despre PySpark DataFrame și instalarea modulelor, parcurgeți acest lucru articol .
PySpark DataFrame în CSV prin conversia în Pandas DataFrame
To_csv() este o metodă disponibilă în modulul Pandas care convertește Pandas DataFrame în CSV. În primul rând, trebuie să convertim PySpark DataFrame în Pandas DataFrame. Pentru aceasta este folosită metoda toPandas(). Să vedem sintaxa lui to_csv() împreună cu parametrii săi.
Sintaxă:
pandas_dataframe_obj.to_csv(cale/ „nume_fișier.csv” , antet ,index,coloane,mod...)
- Trebuie să specificăm numele fișierului CSV. Dacă doriți să stocați fișierul CSV descărcat într-o anumită locație de pe computer, puteți specifica și calea împreună cu numele fișierului.
- Coloanele sunt incluse dacă antetul este setat la „Adevărat”. Dacă nu aveți nevoie de coloane, setați antetul la „False”.
- Indicii sunt specificați dacă indexul este setat la „True”. Dacă nu aveți nevoie de indici, setați indexul la „False”.
- Parametrul Coloane preia o listă de nume de coloane în care putem specifica ce coloane particulare sunt extrase în fișierul CSV.
- Putem adăuga înregistrările în CSV utilizând parametrul mode. Adăugați – „a” este folosit pentru a face acest lucru.
Exemplul 1: Cu Parametrii Antet și Index
Creați „skills_df” PySpark DataFrame cu 3 rânduri și 4 coloane. Convertiți acest DataFrame în CSV conversia mai întâi în Pandas DataFrame.
import pyspark
din pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# date despre competențe cu 3 rânduri și 4 coloane
aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },
{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : 'dans' , 'premiu' : 2000 },
{ 'id' : 153 , 'persoană' : 'Tulasi' , 'deprindere' : 'citind' , 'premiu' : 1200 }
]
# creați cadrul de date privind competențele din datele de mai sus
skills_df = linuxhint_spark_app.createDataFrame(skills)
skills_df.show()
# Convertiți skills_df în Pandas DataFrame
pandas_skills_df= skills_df.toPandas()
print(pandas_skills_df)
# Convertiți acest DataFrame în csv cu antet și index
pandas_skills_df.to_csv( „pandas_skills1.csv” , antet =Adevărat, index=Adevărat)
Ieșire:
Putem vedea că PySpark DataFrame este convertit în Pandas DataFrame. Să vedem dacă este convertit în CSV cu nume de coloane și indici:
Exemplul 2: Adăugați datele la CSV
Creați încă un PySpark DataFrame cu 1 înregistrare și adăugați-l la CSV, care este creat ca parte a primului nostru exemplu. Asigurați-vă că trebuie să setăm antetul la „False” împreună cu parametrul mod. În caz contrar, numele coloanelor sunt, de asemenea, atașate ca un rând.
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
aptitudini =[{ 'id' : 90 , 'persoană' : „Bhargav” , 'deprindere' : 'citind' , 'premiu' : 12000 }
]
# creați cadrul de date privind competențele din datele de mai sus
skills_df = linuxhint_spark_app.createDataFrame(skills)
# Convertiți skills_df în Pandas DataFrame
pandas_skills_df= skills_df.toPandas()
# Adăugați acest DataFrame la fișierul pandas_skills1.csv
pandas_skills_df.to_csv( „pandas_skills1.csv” , mod= 'A' , antet = Fals)
Ieșire CSV:
Putem vedea că un nou rând este adăugat la fișierul CSV.
Exemplul 3: Cu parametrul Coloane
Să avem același DataFrame și să-l convertim în CSV cu două coloane: „persoană” și „premiu”.
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# date despre competențe cu 3 rânduri și 4 coloane
aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },
{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : 'dans' , 'premiu' : 2000 },
{ 'id' : 153 , 'persoană' : 'Tulasi' , 'deprindere' : 'citind' , 'premiu' : 1200 }
]
# creați cadrul de date privind competențele din datele de mai sus
skills_df = linuxhint_spark_app.createDataFrame(skills)
# Convertiți skills_df în Pandas DataFrame
pandas_skills_df= skills_df.toPandas()
# Convertiți acest DataFrame în csv cu anumite coloane
pandas_skills_df.to_csv( „pandas_skills2.csv” , coloane=[ 'persoană' , 'premiu' ])
Ieșire CSV:
Putem vedea că numai coloanele „persoană” și „premiu” există în fișierul CSV.
PySpark Pandas DataFrame în CSV folosind metoda To_Csv().
To_csv() este o metodă disponibilă în modulul Pandas care convertește Pandas DataFrame în CSV. În primul rând, trebuie să convertim PySpark DataFrame în Pandas DataFrame. Pentru aceasta este folosită metoda toPandas(). Să vedem sintaxa lui to_csv() împreună cu parametrii săi:
Sintaxă:
pyspark_pandas_dataframe_obj.to_csv(cale/ „nume_fișier.csv” , antet ,index,coloane,...)- Trebuie să specificăm numele fișierului CSV. Dacă doriți să stocați fișierul CSV descărcat într-o anumită locație de pe computer, puteți specifica și calea împreună cu numele fișierului.
- Coloanele sunt incluse dacă antetul este setat la „Adevărat”. Dacă nu aveți nevoie de coloane, setați antetul la „False”.
- Indicii sunt specificați dacă indexul este setat la „True”. Dacă nu aveți nevoie de indici, setați indexul la „False”.
- Parametrul coloane preia o listă de nume de coloane în care putem specifica ce coloane particulare sunt extrase în fișierul CSV.
Exemplul 1: Cu parametrul Coloane
Creați un PySpark Pandas DataFrame cu 3 coloane și convertiți-l în CSV folosind to_csv() cu coloanele „persoană” și „premiu”.
de la pyspark import pandapyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persoană' :[ 'Miere' , „Mouni” , 'se' , 'Radha' ], 'premiu' :[ 1 , 2 , 3 , 4 ]})
print(pyspark_pandas_dataframe)
# Convertiți acest DataFrame în csv cu anumite coloane
pyspark_pandas_dataframe.to_csv( „pyspark_pandas1” , coloane=[ 'persoană' , 'premiu' ])
Ieșire:
Putem vedea că PySpark Pandas DataFrame este convertit în CSV cu două partiții. Fiecare partiție deține 2 înregistrări. De asemenea, coloanele din CSV sunt doar „persoană” și „premiu”.
Fișierul de partiție 1:
Fișierul de partiție 2:
Exemplul 2: Cu parametrul Header
Utilizați DataFrame anterior și specificați parametrul antetului setându-l la „True”.
de la pyspark import pandapyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persoană' :[ 'Miere' , „Mouni” , 'se' , 'Radha' ], 'premiu' :[ 1 , 2 , 3 , 4 ]})
# Convertiți acest DataFrame în csv cu antet.
pyspark_pandas_dataframe.to_csv( „pyspark_pandas2” , antet =Adevărat)
Ieșire CSV:
Putem vedea că PySpark Pandas DataFrame este convertit în CSV cu două partiții. Fiecare partiție deține 2 înregistrări cu nume de coloane.
Fișierul de partiție 1:
Fișierul de partiție 2:
PySpark Pandas DataFrame în CSV prin conversia în NumPy Array
Avem o opțiune de a converti PySpark Pandas DataFrame în CSV prin conversia în matricea Numpy. To_numpy() este o metodă disponibilă în modulul PySpark Pandas care convertește PySpark Pandas DataFrame în matricea NumPy.
Sintaxă:
pyspark_pandas_dataframe_obj.to_numpy()Nu va lua niciun parametru.
Folosind metoda Tofile().
După conversia în matricea NumPy, putem folosi metoda tofile() pentru a converti NumPy în CSV. Aici, stochează fiecare înregistrare într-o nouă celulă, în funcție de coloană, în fișierul CSV.
Sintaxă:
array_obj.to_numpy(filename/path,sep=’ ’)Ia numele fișierului sau calea unui CSV și un separator.
Exemplu:
Creați PySpark Pandas DataFrame cu 3 coloane și 4 înregistrări și convertiți-l în CSV conversia mai întâi într-o matrice NumPy.
de la pyspark import pandapyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persoană' :[ 'Miere' , „Mouni” , 'se' , 'Radha' ], 'premiu' :[ 1 , 2 , 3 , 4 ]})
# Convertiți DataFrame de mai sus în matrice numpy
convertit = pyspark_pandas_dataframe.to_numpy()
imprimare (convertit)
# Folosind tofile()
converted.tofile( „converted1.csv” , sep = ',' )
Ieșire:
[[ 90 'Miere' 1 ][ 78 „Mouni” 2 ]
[ 90 'se' 3 ]
[ 57 'Radha' 4 ]]
Putem vedea că PySpark Pandas DataFrame este convertit într-o matrice NumPy (12 valori). Dacă puteți vedea datele CSV, aceasta stochează fiecare valoare de celulă într-o coloană nouă.
PySpark DataFrame la CSV folosind metoda Write.Csv().
Metoda write.csv() ia numele/calea fișierului unde trebuie să salvăm fișierul CSV ca parametru.
Sintaxă:
dataframe_object.coalesce( 1 ).write.csv( 'nume de fișier' )De fapt, CSV-ul este salvat ca partiții (mai mult de una). Pentru a scăpa de acest lucru, îmbinăm toate fișierele CSV partiționate într-unul singur. În acest scenariu, folosim funcția coalesce(). Acum, putem vedea un singur fișier CSV cu toate rândurile din PySpark DataFrame.
Exemplu:
Luați în considerare PySpark DataFrame cu 4 înregistrări având 4 coloane. Scrieți acest DataFrame în CSV cu fișierul numit „market_details”.
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# date de piață cu 4 rânduri și 4 coloane
piata =[{ 'm_id' : „mz-001” , „nume_m” : „ABC” , 'm_city' : 'delhi' , 'm_state' : 'delhi' },
{ 'm_id' : „mz-002” , „nume_m” : „XYZ” , 'm_city' : 'patna' , 'm_state' : 'noroc' },
{ 'm_id' : „mz-003” , „nume_m” : „PQR” , 'm_city' : 'florida' , 'm_state' : 'unu' },
{ 'm_id' : „mz-004” , „nume_m” : „ABC” , 'm_city' : 'delhi' , 'm_state' : 'noroc' }
]
# creați cadrul de date de piață din datele de mai sus
market_df = linuxhint_spark_app.createDataFrame(piață)
# Date reale de piață
market_df.show()
# write.csv()
market_df.coalesce( 1 ).write.csv( 'detalii_piață' )
Ieșire:
Să verificăm fișierul:
Deschide ultimul fișier pentru a vedea înregistrările.
Concluzie
Am învățat cele patru scenarii diferite care convertesc PySpark DataFrame în CSV cu exemple, luând în considerare diferiți parametri. Când lucrați cu PySpark DataFrame, aveți două opțiuni pentru a converti acest DataFrame în CSV: o modalitate este utilizarea metodei write() și alta este utilizarea metodei to_csv() prin conversia în Pandas DataFrame. Dacă lucrați cu PySpark Pandas DataFrame, puteți utiliza și to_csv() și tofile() prin conversia în matricea NumPy.