Conversia PySpark DataFrame în JSON

Conversia Pyspark Dataframe In Json



Este posibilă transmiterea unei date structurate folosind JSON și, de asemenea, consumă memorie redusă. În comparație cu PySpark RDD sau PySpark DataFrame, JSON consumă memorie redusă și serializare, ceea ce este posibil cu JSON. Putem converti PySpark DataFrame în JSON folosind metoda pyspark.sql.DataFrameWriter.json(). În afară de aceasta, există alte două moduri de a converti DataFrame în JSON.

Subiect de conținut:

Să luăm în considerare un simplu PySpark DataFrame în toate exemplele și să îl convertim în JSON folosind funcțiile menționate.







Modul necesar:

Instalați biblioteca PySpark în mediul dvs. dacă nu este încă instalată. Puteți consulta următoarea comandă pentru ao instala:



pip install pyspark

PySpark DataFrame la JSON Folosind To_json() cu ToPandas()

Metoda to_json() este disponibilă în modulul Pandas care convertește Pandas DataFrame în JSON. Putem folosi această metodă dacă ne convertim PySpark DataFrame în Pandas DataFrame. Pentru a converti PySpark DataFrame în Pandas DataFrame, se folosește metoda toPandas(). Să vedem sintaxa lui to_json() împreună cu parametrii săi.



Sintaxă:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Orient este folosit pentru a afișa JSON convertit ca format dorit. Este nevoie de „înregistrări”, „tabel”, „valori”, „coloane”, „index”, „divizat”.
  2. Indexul este folosit pentru a include/elimina indexul din șirul JSON convertit. Dacă este setat la „True”, indicii sunt afișați. În caz contrar, indicii nu vor fi afișați dacă orientul este „divizat” sau „tabel”.

Exemplul 1: Orientați ca „Înregistrări”

Creați un PySpark DataFrame „skills_df” cu 3 rânduri și 4 coloane. Convertiți acest DataFrame în JSON specificând parametrul orient ca „înregistrări”.

import pyspark

importa panda

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# date despre competențe cu 3 rânduri și 4 coloane

aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },

{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : 'dans' , 'premiu' : 2000 },

{ 'id' : 153 , 'persoană' : 'Tulasi' , 'deprindere' : 'citind' , 'premiu' : 1200 }

]

# creați cadrul de date privind competențele din datele de mai sus

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Date reale despre competențe

competențe_df.show()

# Convertiți în JSON folosind to_json() cu orient ca „înregistrări”

json_skills_data = skills_df.toPandas().to_json(orient= 'înregistrări' )

print(json_skills_data)

Ieșire:



+---+------+-----+--------+

| id|persoana|premiu| pricepere|

+---+------+-----+--------+

| 123 | Miere| 25000 |pictură|

| 112 | Mouni| 2000 | dans|

| 153 |Tulasi| 1200 | lectură|

+---+------+-----+--------+

[{ 'id' : 123 , 'persoană' : 'Miere' , 'premiu' : 25000 , 'deprindere' : 'pictura' },{ 'id' : 112 , 'persoană' : 'Mouni' , 'premiu' : 2000 , 'deprindere' : 'dans' },{ 'id' : 153 , 'persoană' : 'Tulasi' , 'premiu' : 1200 , 'deprindere' : 'citind' }]

Putem vedea că PySpark DataFrame este convertit în matricea JSON cu un dicționar de valori. Aici, cheile reprezintă numele coloanei, iar valoarea reprezintă valoarea rândului/celulei din PySpark DataFrame.

Exemplul 2: Orientați ca „Split”

Formatul JSON care este returnat de orientarea „divizată” include numele coloanelor care au o listă de coloane, o listă de index și o listă de date. Următorul este formatul orientului „divizat”.

# Convertiți în JSON folosind to_json() cu orient ca „split”

json_skills_data = skills_df.toPandas().to_json(orient= 'Despică' )

print(json_skills_data)

Ieșire:

{ 'coloane' :[ 'id' , 'persoană' , 'premiu' , 'deprindere' ], 'index' :[ 0 , 1 , 2 ], 'date' :[[ 123 , 'Miere' , 25000 , 'pictura' ],[ 112 , 'Mouni' , 2000 , 'dans' ],[ 153 , 'Tulasi' , 1200 , 'citind' ]]}

Exemplul 3: Orientați ca „Index”

Aici, fiecare rând din PySpark DataFrame este retras sub forma unui dicționar cu cheia ca nume de coloană. Pentru fiecare dicționar, poziția indexului este specificată ca o cheie.

# Convertiți în JSON folosind to_json() cu orient ca „index”

json_skills_data = skills_df.toPandas().to_json(orient= 'index' )

print(json_skills_data)

Ieșire:

{ '0' :{ 'id' : 123 , 'persoană' : 'Miere' , 'premiu' : 25000 , 'deprindere' : 'pictura' }, '1' :{ 'id' : 112 , 'persoană' : 'Mouni' , 'premiu' : 2000 , 'deprindere' : 'dans' }, '2' :{ 'id' : 153 , 'persoană' : 'Tulasi' , 'premiu' : 1200 , 'deprindere' : 'citind' }}

Exemplul 4: Orientați ca „Coloane”

Coloanele sunt cheia pentru fiecare înregistrare. Fiecare coloană conține un dicționar care preia valorile coloanei cu numere de index.

# Convertiți în JSON folosind to_json() cu orient ca „coloane”

json_skills_data = skills_df.toPandas().to_json(orient= 'coloane' )

print(json_skills_data)

Ieșire:

{ 'id' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persoană' :{ '0' : 'Miere' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'premiu' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'deprindere' :{ '0' : 'pictura' , '1' : 'dans' , '2' : 'citind' }}

Exemplul 5: Orientează-te ca „valori”

Dacă aveți nevoie doar de valorile în JSON, puteți alege orientarea „valori”. Afișează fiecare rând dintr-o listă. În cele din urmă, toate listele sunt stocate într-o listă. Acest JSON este de tipul listă imbricată.

# Convertiți în JSON folosind to_json() cu orient ca „valori”

json_skills_data = skills_df.toPandas().to_json(orient= „valori” )

print(json_skills_data)

Ieșire:

[[ 123 , 'Miere' , 25000 , 'pictura' ],[ 112 , 'Mouni' , 2000 , 'dans' ],[ 153 , 'Tulasi' , 1200 , 'citind' ]]

Exemplul 6: Orientați ca „Tabel”

Orientul „tabel” returnează JSON care include schema cu numele câmpurilor împreună cu tipurile de date coloane, indexul ca cheie primară și versiunea Pandas. Numele coloanelor cu valori sunt afișate ca „date”.

# Convertiți în JSON folosind to_json() cu orient ca „tabel”

json_skills_data = skills_df.toPandas().to_json(orient= 'masa' )

print(json_skills_data)

Ieșire:

{ 'schemă' :{ 'câmpuri' :[{ 'Nume' : 'index' , 'tip' : 'întreg' },{ 'Nume' : 'id' , 'tip' : 'întreg' },{ 'Nume' : 'persoană' , 'tip' : 'şir' },{ 'Nume' : 'premiu' , 'tip' : 'întreg' },{ 'Nume' : 'deprindere' , 'tip' : 'şir' }], 'cheia principala' :[ 'index' ], „versiunea_pandas” : „1.4.0” }, 'date' :[{ 'index' : 0 , 'id' : 123 , 'persoană' : 'Miere' , 'premiu' : 25000 , 'deprindere' : 'pictura' },{ 'index' : 1 , 'id' : 112 , 'persoană' : 'Mouni' , 'premiu' : 2000 , 'deprindere' : 'dans' },{ 'index' : 2 , 'id' : 153 , 'persoană' : 'Tulasi' , 'premiu' : 1200 , 'deprindere' : 'citind' }]}

Exemplul 7: Cu Parametrul Index

În primul rând, trecem parametrul index setându-l la „True”. Veți vedea pentru fiecare valoare de coloană că poziția indexului este returnată ca o cheie într-un dicționar.

În a doua ieșire, numai numele coloanelor („coloane”) și înregistrările („date”) sunt returnate fără pozițiile indexului, deoarece indexul este setat la „False”.

# Convertiți în JSON folosind to_json() cu index=True

json_skills_data = skills_df.toPandas().to_json(index=True)

print(json_skills_data, ' \n ' )

# Convertiți în JSON folosind to_json() cu index=False

json_skills_data= skills_df.toPandas().to_json(index=False,orient= 'Despică' )

print(json_skills_data)

Ieșire:

{ 'id' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persoană' :{ '0' : 'Miere' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'premiu' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'deprindere' :{ '0' : 'pictura' , '1' : 'dans' , '2' : 'citind' }}

{ 'coloane' :[ 'id' , 'persoană' , 'premiu' , 'deprindere' ], 'date' :[[ 123 , 'Miere' , 25000 , 'pictura' ],[ 112 , 'Mouni' , 2000 , 'dans' ],[ 153 , 'Tulasi' , 1200 , 'citind' ]]

PySpark DataFrame la JSON folosind ToJSON()

Metoda toJSON() este utilizată pentru a converti PySpark DataFrame într-un obiect JSON. Practic, returnează un șir JSON care este înconjurat de o listă. The [‘{coloană:valoare,…}’,…. ] este formatul care este returnat de această funcție. Aici, fiecare rând din PySpark DataFrame este returnat ca un dicționar cu numele coloanei drept cheie.

Sintaxă:

dataframe_object.toJSON()

Este posibil să se transmită parametri precum indexul, etichetele coloanelor și tipul de date.

Exemplu:

Creați un PySpark DataFrame „skills_df” cu 5 rânduri și 4 coloane. Convertiți acest DataFrame în JSON folosind metoda toJSON().

import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# date despre competențe cu 5 rânduri și 4 coloane

aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },

{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : „muzică/dans” , 'premiu' : 2000 },

{ 'id' : 153 , 'persoană' : „Tulasi” , 'deprindere' : 'citind' , 'premiu' : 1200 },

{ 'id' : 173 , 'persoană' : 'A fugit' , 'deprindere' : 'muzică' , 'premiu' : 2000 },

{ 'id' : 43 , 'persoană' : 'Kamala' , 'deprindere' : 'citind' , 'premiu' : 10000 }

]

# creați cadrul de date privind competențele din datele de mai sus

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Date reale despre competențe

skills_df.show()

# Convertiți în matrice JSON

json_skills_data = skills_df.toJSON().collect()

print(json_skills_data)

Ieșire:

+---+------+-----+-----------+

| id|persoana|premiu| pricepere|

+---+------+-----+-----------+

| 123 | Miere| 25000 | pictura|

| 112 | Mouni| 2000 |muzică/dans|

| 153 |Tulasi| 1200 | lectură|

| 173 | Ran| 2000 | muzica|

| 43 |Kamala| 10000 | lectură|

+---+------+-----+-----------+

[ „{'id':123,'person':'Miere','premiu':25000,'skill':'pictură'}' , „{'id':112,'person':'Mouni','premiu':2000,'skill':'muzică/dans'}' , „{'id':153,'person':'Tulasi','premiu':1200,'skill':'reading'}' , „{'id':173,'person':'Ran','premiu':2000,'skill':'music'}' , „{'id':43,'person':'Kamala','premiu':10000,'skill':'reading'}' ]

Există 5 rânduri în PySpark DataFrame. Toate aceste 5 rânduri sunt returnate ca un dicționar de șiruri care sunt separate prin virgulă.

PySpark DataFrame la JSON folosind Write.json()

Metoda write.json() este disponibilă în PySpark, care scrie/salvează PySpark DataFrame într-un fișier JSON. Ia numele/calea fișierului ca parametru. Practic, returnează JSON în mai multe fișiere (fișiere partiționate). Pentru a le îmbina pe toate într-un singur fișier, putem folosi metoda coalesce().

Sintaxă:

dataframe_object.coalesce( 1 ).write.json(‘nume_fișier’)
  1. Modul Adăugare - dataframe_object.write.mode(‘append’).json(‘nume_fișier’)
  2. Modul de suprascriere - dataframe_object.write.mode(‘suprascriere’).json(‘nume_fișier’)

Este posibil să adăugați/suprascrieți JSON existent. Folosind write.mode(), putem adăuga datele trecând „append” sau suprascriem datele JSON existente prin trecerea „overwrite” acestei funcții.

Exemplul 1:

Creați un PySpark DataFrame „skills_df” cu 3 rânduri și 4 coloane. Scrieți acest DataFrame în JSON.

import pyspark

importa panda

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# date despre competențe cu 3 rânduri și 4 coloane

aptitudini =[{ 'id' : 123 , 'persoană' : 'Miere' , 'deprindere' : 'pictura' , 'premiu' : 25000 },

{ 'id' : 112 , 'persoană' : „Mouni” , 'deprindere' : 'dans' , 'premiu' : 2000 },

{ 'id' : 153 , 'persoană' : 'Tulasi' , 'deprindere' : 'citind' , 'premiu' : 1200 }

]

# creați cadrul de date privind competențele din datele de mai sus

skills_df = linuxhint_spark_app.createDataFrame(skills)

# write.json()

skills_df.coalesce( 1 ).write.json( 'skills_data' )

Fișier JSON:

Putem vedea că folderul skills_data include datele JSON partiționate.

Să deschidem fișierul JSON. Putem vedea că toate rândurile din PySpark DataFrame sunt convertite în JSON.

Există 5 rânduri în PySpark DataFrame. Toate aceste 5 rânduri sunt returnate ca un dicționar de șiruri care sunt separate prin virgulă.

Exemplul 2:

Creați un „skills2_df” PySpark DataFrame cu un rând. Adăugați un rând la fișierul JSON anterior specificând modul ca „adăugați”.

import pyspark

importa panda

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

competențe2 =[{ 'id' : 78 , 'persoană' : 'Maria' , 'deprindere' : „călărit” , 'premiu' : 8960 }

]

# creați cadrul de date privind competențele din datele de mai sus

skills2_df = linuxhint_spark_app.createDataFrame(skills2)

# write.json() cu modul de adăugare.

competențe2_df.write.mode( 'adăuga' ).json( 'skills_data' )

Fișier JSON:

Putem vedea fișierele JSON partiționate. Primul fișier conține primele înregistrări DataFrame, iar al doilea fișier deține a doua înregistrare DataFrame.

Concluzie

Există trei moduri diferite de a converti PySpark DataFrame în JSON. În primul rând, am discutat despre metoda to_json() care se convertește în JSON prin conversia PySpark DataFrame în Pandas DataFrame cu diferite exemple, luând în considerare diferiți parametri. Apoi, am folosit metoda toJSON(). În cele din urmă, am învățat cum să folosim funcția write.json() pentru a scrie PySpark DataFrame în JSON. Adăugarea și suprascrierea sunt posibile cu această funcție.