Conversia PySpark DataFrame în CSV

Conversia Pyspark Dataframe In Csv



Să ne uităm la cele patru scenarii diferite de conversie a PySpark DataFrame în CSV. În mod direct, folosim metoda write.csv() pentru a converti PySpark DataFrame în CSV. Folosind funcția to_csv(), convertim PySpark Pandas DataFrame în CSV. Poate fi posibil și prin conversia acestuia în matricea NumPy.

Subiect de conținut:

Dacă doriți să aflați despre PySpark DataFrame și instalarea modulelor, parcurgeți acest lucru articol .







PySpark DataFrame în CSV prin conversia în Pandas DataFrame

To_csv() este o metodă disponibilă în modulul Pandas care convertește Pandas DataFrame în CSV. În primul rând, trebuie să convertim PySpark DataFrame în Pandas DataFrame. Pentru aceasta este folosită metoda toPandas(). Să vedem sintaxa lui to_csv() împreună cu parametrii săi.



Sintaxă:



pandas_dataframe_obj.to_csv(cale/ „nume_fișier.csv” , antet ,index,coloane,mod...)
  1. Trebuie să specificăm numele fișierului CSV. Dacă doriți să stocați fișierul CSV descărcat într-o anumită locație de pe computer, puteți specifica și calea împreună cu numele fișierului.
  2. Coloanele sunt incluse dacă antetul este setat la „Adevărat”. Dacă nu aveți nevoie de coloane, setați antetul la „False”.
  3. Indicii sunt specificați dacă indexul este setat la „True”. Dacă nu aveți nevoie de indici, setați indexul la „False”.
  4. Parametrul Coloane preia o listă de nume de coloane în care putem specifica ce coloane particulare sunt extrase în fișierul CSV.
  5. Putem adăuga înregistrările în CSV utilizând parametrul mode. Adăugați – „a” este folosit pentru a face acest lucru.

Exemplul 1: Cu Parametrii Antet și Index

Creați „skills_df” PySpark DataFrame cu 3 rânduri și 4 coloane. Convertiți acest DataFrame în CSV conversia mai întâi în Pandas DataFrame.





import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# date despre competențe cu 3 rânduri și 4 coloane

aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },

{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : 'dans' , 'premiu' : 2000 },

{ 'id' : 153 , 'persoană' : 'Tulasi' , 'deprindere' : 'citind' , 'premiu' : 1200 }

]

# creați cadrul de date privind competențele din datele de mai sus

skills_df = linuxhint_spark_app.createDataFrame(skills)

skills_df.show()

# Convertiți skills_df în Pandas DataFrame

pandas_skills_df= skills_df.toPandas()

print(pandas_skills_df)

# Convertiți acest DataFrame în csv cu antet și index

pandas_skills_df.to_csv( „pandas_skills1.csv” , antet =Adevărat, index=Adevărat)

Ieșire:



Putem vedea că PySpark DataFrame este convertit în Pandas DataFrame. Să vedem dacă este convertit în CSV cu nume de coloane și indici:

Exemplul 2: Adăugați datele la CSV

Creați încă un PySpark DataFrame cu 1 înregistrare și adăugați-l la CSV, care este creat ca parte a primului nostru exemplu. Asigurați-vă că trebuie să setăm antetul la „False” împreună cu parametrul mod. În caz contrar, numele coloanelor sunt, de asemenea, atașate ca un rând.

import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

aptitudini =[{ 'id' : 90 , 'persoană' : „Bhargav” , 'deprindere' : 'citind' , 'premiu' : 12000 }

]

# creați cadrul de date privind competențele din datele de mai sus

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Convertiți skills_df în Pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Adăugați acest DataFrame la fișierul pandas_skills1.csv

pandas_skills_df.to_csv( „pandas_skills1.csv” , mod= 'A' , antet = Fals)

Ieșire CSV:

Putem vedea că un nou rând este adăugat la fișierul CSV.

Exemplul 3: Cu parametrul Coloane

Să avem același DataFrame și să-l convertim în CSV cu două coloane: „persoană” și „premiu”.

import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# date despre competențe cu 3 rânduri și 4 coloane

aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },

{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : 'dans' , 'premiu' : 2000 },

{ 'id' : 153 , 'persoană' : 'Tulasi' , 'deprindere' : 'citind' , 'premiu' : 1200 }

]

# creați cadrul de date privind competențele din datele de mai sus

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Convertiți skills_df în Pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Convertiți acest DataFrame în csv cu anumite coloane

pandas_skills_df.to_csv( „pandas_skills2.csv” , coloane=[ 'persoană' , 'premiu' ])

Ieșire CSV:

Putem vedea că numai coloanele „persoană” și „premiu” există în fișierul CSV.

PySpark Pandas DataFrame în CSV folosind metoda To_Csv().

To_csv() este o metodă disponibilă în modulul Pandas care convertește Pandas DataFrame în CSV. În primul rând, trebuie să convertim PySpark DataFrame în Pandas DataFrame. Pentru aceasta este folosită metoda toPandas(). Să vedem sintaxa lui to_csv() împreună cu parametrii săi:

Sintaxă:

pyspark_pandas_dataframe_obj.to_csv(cale/ „nume_fișier.csv” , antet ,index,coloane,...)
  1. Trebuie să specificăm numele fișierului CSV. Dacă doriți să stocați fișierul CSV descărcat într-o anumită locație de pe computer, puteți specifica și calea împreună cu numele fișierului.
  2. Coloanele sunt incluse dacă antetul este setat la „Adevărat”. Dacă nu aveți nevoie de coloane, setați antetul la „False”.
  3. Indicii sunt specificați dacă indexul este setat la „True”. Dacă nu aveți nevoie de indici, setați indexul la „False”.
  4. Parametrul coloane preia o listă de nume de coloane în care putem specifica ce coloane particulare sunt extrase în fișierul CSV.

Exemplul 1: Cu parametrul Coloane

Creați un PySpark Pandas DataFrame cu 3 coloane și convertiți-l în CSV folosind to_csv() cu coloanele „persoană” și „premiu”.

de la pyspark import panda

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persoană' :[ 'Miere' , „Mouni” , 'se' , 'Radha' ], 'premiu' :[ 1 , 2 , 3 , 4 ]})

print(pyspark_pandas_dataframe)

# Convertiți acest DataFrame în csv cu anumite coloane

pyspark_pandas_dataframe.to_csv( „pyspark_pandas1” , coloane=[ 'persoană' , 'premiu' ])

Ieșire:

Putem vedea că PySpark Pandas DataFrame este convertit în CSV cu două partiții. Fiecare partiție deține 2 înregistrări. De asemenea, coloanele din CSV sunt doar „persoană” și „premiu”.

Fișierul de partiție 1:

Fișierul de partiție 2:

Exemplul 2: Cu parametrul Header

Utilizați DataFrame anterior și specificați parametrul antetului setându-l la „True”.

de la pyspark import panda

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persoană' :[ 'Miere' , „Mouni” , 'se' , 'Radha' ], 'premiu' :[ 1 , 2 , 3 , 4 ]})

# Convertiți acest DataFrame în csv cu antet.

pyspark_pandas_dataframe.to_csv( „pyspark_pandas2” , antet =Adevărat)

Ieșire CSV:

Putem vedea că PySpark Pandas DataFrame este convertit în CSV cu două partiții. Fiecare partiție deține 2 înregistrări cu nume de coloane.

Fișierul de partiție 1:

Fișierul de partiție 2:

PySpark Pandas DataFrame în CSV prin conversia în NumPy Array

Avem o opțiune de a converti PySpark Pandas DataFrame în CSV prin conversia în matricea Numpy. To_numpy() este o metodă disponibilă în modulul PySpark Pandas care convertește PySpark Pandas DataFrame în matricea NumPy.

Sintaxă:

pyspark_pandas_dataframe_obj.to_numpy()

Nu va lua niciun parametru.

Folosind metoda Tofile().

După conversia în matricea NumPy, putem folosi metoda tofile() pentru a converti NumPy în CSV. Aici, stochează fiecare înregistrare într-o nouă celulă, în funcție de coloană, în fișierul CSV.

Sintaxă:

array_obj.to_numpy(filename/path,sep=’ ’)

Ia numele fișierului sau calea unui CSV și un separator.

Exemplu:

Creați PySpark Pandas DataFrame cu 3 coloane și 4 înregistrări și convertiți-l în CSV conversia mai întâi într-o matrice NumPy.

de la pyspark import panda

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'persoană' :[ 'Miere' , „Mouni” , 'se' , 'Radha' ], 'premiu' :[ 1 , 2 , 3 , 4 ]})

# Convertiți DataFrame de mai sus în matrice numpy

convertit = pyspark_pandas_dataframe.to_numpy()

imprimare (convertit)

# Folosind tofile()

converted.tofile( „converted1.csv” , sep = ',' )

Ieșire:

[[ 90 'Miere' 1 ]

[ 78 „Mouni” 2 ]

[ 90 'se' 3 ]

[ 57 'Radha' 4 ]]

Putem vedea că PySpark Pandas DataFrame este convertit într-o matrice NumPy (12 valori). Dacă puteți vedea datele CSV, aceasta stochează fiecare valoare de celulă într-o coloană nouă.

PySpark DataFrame la CSV folosind metoda Write.Csv().

Metoda write.csv() ia numele/calea fișierului unde trebuie să salvăm fișierul CSV ca parametru.

Sintaxă:

dataframe_object.coalesce( 1 ).write.csv( 'nume de fișier' )

De fapt, CSV-ul este salvat ca partiții (mai mult de una). Pentru a scăpa de acest lucru, îmbinăm toate fișierele CSV partiționate într-unul singur. În acest scenariu, folosim funcția coalesce(). Acum, putem vedea un singur fișier CSV cu toate rândurile din PySpark DataFrame.

Exemplu:

Luați în considerare PySpark DataFrame cu 4 înregistrări având 4 coloane. Scrieți acest DataFrame în CSV cu fișierul numit „market_details”.

import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# date de piață cu 4 rânduri și 4 coloane

piata =[{ 'm_id' : „mz-001” , „nume_m” : „ABC” , 'm_city' : 'delhi' , 'm_state' : 'delhi' },

{ 'm_id' : „mz-002” , „nume_m” : „XYZ” , 'm_city' : 'patna' , 'm_state' : 'noroc' },

{ 'm_id' : „mz-003” , „nume_m” : „PQR” , 'm_city' : 'florida' , 'm_state' : 'unu' },

{ 'm_id' : „mz-004” , „nume_m” : „ABC” , 'm_city' : 'delhi' , 'm_state' : 'noroc' }

]



# creați cadrul de date de piață din datele de mai sus

market_df = linuxhint_spark_app.createDataFrame(piață)

# Date reale de piață

market_df.show()

# write.csv()

market_df.coalesce( 1 ).write.csv( 'detalii_piață' )

Ieșire:

Să verificăm fișierul:

Deschide ultimul fișier pentru a vedea înregistrările.

Concluzie

Am învățat cele patru scenarii diferite care convertesc PySpark DataFrame în CSV cu exemple, luând în considerare diferiți parametri. Când lucrați cu PySpark DataFrame, aveți două opțiuni pentru a converti acest DataFrame în CSV: o modalitate este utilizarea metodei write() și alta este utilizarea metodei to_csv() prin conversia în Pandas DataFrame. Dacă lucrați cu PySpark Pandas DataFrame, puteți utiliza și to_csv() și tofile() prin conversia în matricea NumPy.