Subiect de conținut:
- PySpark DataFrame la JSON Folosind To_json() cu ToPandas()
- PySpark DataFrame la JSON folosind ToJSON()
- PySpark DataFrame la JSON folosind Write.json()
Să luăm în considerare un simplu PySpark DataFrame în toate exemplele și să îl convertim în JSON folosind funcțiile menționate.
Modul necesar:
Instalați biblioteca PySpark în mediul dvs. dacă nu este încă instalată. Puteți consulta următoarea comandă pentru ao instala:
pip install pyspark
PySpark DataFrame la JSON Folosind To_json() cu ToPandas()
Metoda to_json() este disponibilă în modulul Pandas care convertește Pandas DataFrame în JSON. Putem folosi această metodă dacă ne convertim PySpark DataFrame în Pandas DataFrame. Pentru a converti PySpark DataFrame în Pandas DataFrame, se folosește metoda toPandas(). Să vedem sintaxa lui to_json() împreună cu parametrii săi.
Sintaxă:
dataframe_object.toPandas().to_json(orient,index,...)
- Orient este folosit pentru a afișa JSON convertit ca format dorit. Este nevoie de „înregistrări”, „tabel”, „valori”, „coloane”, „index”, „divizat”.
- Indexul este folosit pentru a include/elimina indexul din șirul JSON convertit. Dacă este setat la „True”, indicii sunt afișați. În caz contrar, indicii nu vor fi afișați dacă orientul este „divizat” sau „tabel”.
Exemplul 1: Orientați ca „Înregistrări”
Creați un PySpark DataFrame „skills_df” cu 3 rânduri și 4 coloane. Convertiți acest DataFrame în JSON specificând parametrul orient ca „înregistrări”.
import pysparkimporta panda
din pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# date despre competențe cu 3 rânduri și 4 coloane
aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },
{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : 'dans' , 'premiu' : 2000 },
{ 'id' : 153 , 'persoană' : 'Tulasi' , 'deprindere' : 'citind' , 'premiu' : 1200 }
]
# creați cadrul de date privind competențele din datele de mai sus
skills_df = linuxhint_spark_app.createDataFrame(skills)
# Date reale despre competențe
competențe_df.show()
# Convertiți în JSON folosind to_json() cu orient ca „înregistrări”
json_skills_data = skills_df.toPandas().to_json(orient= 'înregistrări' )
print(json_skills_data)
Ieșire:
+---+------+-----+--------+
| id|persoana|premiu| pricepere|
+---+------+-----+--------+
| 123 | Miere| 25000 |pictură|
| 112 | Mouni| 2000 | dans|
| 153 |Tulasi| 1200 | lectură|
+---+------+-----+--------+
[{ 'id' : 123 , 'persoană' : 'Miere' , 'premiu' : 25000 , 'deprindere' : 'pictura' },{ 'id' : 112 , 'persoană' : 'Mouni' , 'premiu' : 2000 , 'deprindere' : 'dans' },{ 'id' : 153 , 'persoană' : 'Tulasi' , 'premiu' : 1200 , 'deprindere' : 'citind' }]
Putem vedea că PySpark DataFrame este convertit în matricea JSON cu un dicționar de valori. Aici, cheile reprezintă numele coloanei, iar valoarea reprezintă valoarea rândului/celulei din PySpark DataFrame.
Exemplul 2: Orientați ca „Split”
Formatul JSON care este returnat de orientarea „divizată” include numele coloanelor care au o listă de coloane, o listă de index și o listă de date. Următorul este formatul orientului „divizat”.
# Convertiți în JSON folosind to_json() cu orient ca „split”
json_skills_data = skills_df.toPandas().to_json(orient= 'Despică' )
print(json_skills_data)
Ieșire:
{ 'coloane' :[ 'id' , 'persoană' , 'premiu' , 'deprindere' ], 'index' :[ 0 , 1 , 2 ], 'date' :[[ 123 , 'Miere' , 25000 , 'pictura' ],[ 112 , 'Mouni' , 2000 , 'dans' ],[ 153 , 'Tulasi' , 1200 , 'citind' ]]}Exemplul 3: Orientați ca „Index”
Aici, fiecare rând din PySpark DataFrame este retras sub forma unui dicționar cu cheia ca nume de coloană. Pentru fiecare dicționar, poziția indexului este specificată ca o cheie.
# Convertiți în JSON folosind to_json() cu orient ca „index”
json_skills_data = skills_df.toPandas().to_json(orient= 'index' )
print(json_skills_data)
Ieșire:
{ '0' :{ 'id' : 123 , 'persoană' : 'Miere' , 'premiu' : 25000 , 'deprindere' : 'pictura' }, '1' :{ 'id' : 112 , 'persoană' : 'Mouni' , 'premiu' : 2000 , 'deprindere' : 'dans' }, '2' :{ 'id' : 153 , 'persoană' : 'Tulasi' , 'premiu' : 1200 , 'deprindere' : 'citind' }}Exemplul 4: Orientați ca „Coloane”
Coloanele sunt cheia pentru fiecare înregistrare. Fiecare coloană conține un dicționar care preia valorile coloanei cu numere de index.
# Convertiți în JSON folosind to_json() cu orient ca „coloane”
json_skills_data = skills_df.toPandas().to_json(orient= 'coloane' )
print(json_skills_data)
Ieșire:
{ 'id' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persoană' :{ '0' : 'Miere' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'premiu' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'deprindere' :{ '0' : 'pictura' , '1' : 'dans' , '2' : 'citind' }}Exemplul 5: Orientează-te ca „valori”
Dacă aveți nevoie doar de valorile în JSON, puteți alege orientarea „valori”. Afișează fiecare rând dintr-o listă. În cele din urmă, toate listele sunt stocate într-o listă. Acest JSON este de tipul listă imbricată.
# Convertiți în JSON folosind to_json() cu orient ca „valori”
json_skills_data = skills_df.toPandas().to_json(orient= „valori” )
print(json_skills_data)
Ieșire:
[[ 123 , 'Miere' , 25000 , 'pictura' ],[ 112 , 'Mouni' , 2000 , 'dans' ],[ 153 , 'Tulasi' , 1200 , 'citind' ]]Exemplul 6: Orientați ca „Tabel”
Orientul „tabel” returnează JSON care include schema cu numele câmpurilor împreună cu tipurile de date coloane, indexul ca cheie primară și versiunea Pandas. Numele coloanelor cu valori sunt afișate ca „date”.
# Convertiți în JSON folosind to_json() cu orient ca „tabel”
json_skills_data = skills_df.toPandas().to_json(orient= 'masa' )
print(json_skills_data)
Ieșire:
{ 'schemă' :{ 'câmpuri' :[{ 'Nume' : 'index' , 'tip' : 'întreg' },{ 'Nume' : 'id' , 'tip' : 'întreg' },{ 'Nume' : 'persoană' , 'tip' : 'şir' },{ 'Nume' : 'premiu' , 'tip' : 'întreg' },{ 'Nume' : 'deprindere' , 'tip' : 'şir' }], 'cheia principala' :[ 'index' ], „versiunea_pandas” : „1.4.0” }, 'date' :[{ 'index' : 0 , 'id' : 123 , 'persoană' : 'Miere' , 'premiu' : 25000 , 'deprindere' : 'pictura' },{ 'index' : 1 , 'id' : 112 , 'persoană' : 'Mouni' , 'premiu' : 2000 , 'deprindere' : 'dans' },{ 'index' : 2 , 'id' : 153 , 'persoană' : 'Tulasi' , 'premiu' : 1200 , 'deprindere' : 'citind' }]}Exemplul 7: Cu Parametrul Index
În primul rând, trecem parametrul index setându-l la „True”. Veți vedea pentru fiecare valoare de coloană că poziția indexului este returnată ca o cheie într-un dicționar.
În a doua ieșire, numai numele coloanelor („coloane”) și înregistrările („date”) sunt returnate fără pozițiile indexului, deoarece indexul este setat la „False”.
# Convertiți în JSON folosind to_json() cu index=Truejson_skills_data = skills_df.toPandas().to_json(index=True)
print(json_skills_data, ' \n ' )
# Convertiți în JSON folosind to_json() cu index=False
json_skills_data= skills_df.toPandas().to_json(index=False,orient= 'Despică' )
print(json_skills_data)
Ieșire:
{ 'id' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persoană' :{ '0' : 'Miere' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'premiu' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'deprindere' :{ '0' : 'pictura' , '1' : 'dans' , '2' : 'citind' }}{ 'coloane' :[ 'id' , 'persoană' , 'premiu' , 'deprindere' ], 'date' :[[ 123 , 'Miere' , 25000 , 'pictura' ],[ 112 , 'Mouni' , 2000 , 'dans' ],[ 153 , 'Tulasi' , 1200 , 'citind' ]]
PySpark DataFrame la JSON folosind ToJSON()
Metoda toJSON() este utilizată pentru a converti PySpark DataFrame într-un obiect JSON. Practic, returnează un șir JSON care este înconjurat de o listă. The [‘{coloană:valoare,…}’,…. ] este formatul care este returnat de această funcție. Aici, fiecare rând din PySpark DataFrame este returnat ca un dicționar cu numele coloanei drept cheie.
Sintaxă:
dataframe_object.toJSON()Este posibil să se transmită parametri precum indexul, etichetele coloanelor și tipul de date.
Exemplu:
Creați un PySpark DataFrame „skills_df” cu 5 rânduri și 4 coloane. Convertiți acest DataFrame în JSON folosind metoda toJSON().
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# date despre competențe cu 5 rânduri și 4 coloane
aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },
{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : „muzică/dans” , 'premiu' : 2000 },
{ 'id' : 153 , 'persoană' : „Tulasi” , 'deprindere' : 'citind' , 'premiu' : 1200 },
{ 'id' : 173 , 'persoană' : 'A fugit' , 'deprindere' : 'muzică' , 'premiu' : 2000 },
{ 'id' : 43 , 'persoană' : 'Kamala' , 'deprindere' : 'citind' , 'premiu' : 10000 }
]
# creați cadrul de date privind competențele din datele de mai sus
skills_df = linuxhint_spark_app.createDataFrame(skills)
# Date reale despre competențe
skills_df.show()
# Convertiți în matrice JSON
json_skills_data = skills_df.toJSON().collect()
print(json_skills_data)
Ieșire:
+---+------+-----+-----------+| id|persoana|premiu| pricepere|
+---+------+-----+-----------+
| 123 | Miere| 25000 | pictura|
| 112 | Mouni| 2000 |muzică/dans|
| 153 |Tulasi| 1200 | lectură|
| 173 | Ran| 2000 | muzica|
| 43 |Kamala| 10000 | lectură|
+---+------+-----+-----------+
[ „{'id':123,'person':'Miere','premiu':25000,'skill':'pictură'}' , „{'id':112,'person':'Mouni','premiu':2000,'skill':'muzică/dans'}' , „{'id':153,'person':'Tulasi','premiu':1200,'skill':'reading'}' , „{'id':173,'person':'Ran','premiu':2000,'skill':'music'}' , „{'id':43,'person':'Kamala','premiu':10000,'skill':'reading'}' ]
Există 5 rânduri în PySpark DataFrame. Toate aceste 5 rânduri sunt returnate ca un dicționar de șiruri care sunt separate prin virgulă.
PySpark DataFrame la JSON folosind Write.json()
Metoda write.json() este disponibilă în PySpark, care scrie/salvează PySpark DataFrame într-un fișier JSON. Ia numele/calea fișierului ca parametru. Practic, returnează JSON în mai multe fișiere (fișiere partiționate). Pentru a le îmbina pe toate într-un singur fișier, putem folosi metoda coalesce().
Sintaxă:
dataframe_object.coalesce( 1 ).write.json(‘nume_fișier’)- Modul Adăugare - dataframe_object.write.mode(‘append’).json(‘nume_fișier’)
- Modul de suprascriere - dataframe_object.write.mode(‘suprascriere’).json(‘nume_fișier’)
Este posibil să adăugați/suprascrieți JSON existent. Folosind write.mode(), putem adăuga datele trecând „append” sau suprascriem datele JSON existente prin trecerea „overwrite” acestei funcții.
Exemplul 1:
Creați un PySpark DataFrame „skills_df” cu 3 rânduri și 4 coloane. Scrieți acest DataFrame în JSON.
import pysparkimporta panda
din pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# date despre competențe cu 3 rânduri și 4 coloane
aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },
{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : 'dans' , 'premiu' : 2000 },
{ 'id' : 153 , 'persoană' : 'Tulasi' , 'deprindere' : 'citind' , 'premiu' : 1200 }
]
# creați cadrul de date privind competențele din datele de mai sus
skills_df = linuxhint_spark_app.createDataFrame(skills)
# write.json()
skills_df.coalesce( 1 ).write.json( 'skills_data' )
Fișier JSON:
Putem vedea că folderul skills_data include datele JSON partiționate.
Să deschidem fișierul JSON. Putem vedea că toate rândurile din PySpark DataFrame sunt convertite în JSON.
Există 5 rânduri în PySpark DataFrame. Toate aceste 5 rânduri sunt returnate ca un dicționar de șiruri care sunt separate prin virgulă.
Exemplul 2:
Creați un „skills2_df” PySpark DataFrame cu un rând. Adăugați un rând la fișierul JSON anterior specificând modul ca „adăugați”.
import pysparkimporta panda
din pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
competențe2 =[{ 'id' : 78 , 'persoană' : 'Maria' , 'deprindere' : „călărit” , 'premiu' : 8960 }
]
# creați cadrul de date privind competențele din datele de mai sus
skills2_df = linuxhint_spark_app.createDataFrame(skills2)
# write.json() cu modul de adăugare.
competențe2_df.write.mode( 'adăuga' ).json( 'skills_data' )
Fișier JSON:
Putem vedea fișierele JSON partiționate. Primul fișier conține primele înregistrări DataFrame, iar al doilea fișier deține a doua înregistrare DataFrame.
Concluzie
Există trei moduri diferite de a converti PySpark DataFrame în JSON. În primul rând, am discutat despre metoda to_json() care se convertește în JSON prin conversia PySpark DataFrame în Pandas DataFrame cu diferite exemple, luând în considerare diferiți parametri. Apoi, am folosit metoda toJSON(). În cele din urmă, am învățat cum să folosim funcția write.json() pentru a scrie PySpark DataFrame în JSON. Adăugarea și suprascrierea sunt posibile cu această funcție.