PySpark Citiți JSON()

Pyspark Cititi Json



În timp ce lucrați cu PySpark DataFrames, acesta trebuie să fie stocat în PySpark DataFrame dacă doriți să procesați datele JSON. După stocarea în DataFrame, putem aplica diferitele operații și metode asupra datelor. De asemenea, există atât de multe avantaje dacă convertim JSON în PySpark DataFrame, deoarece este simplu și putem transforma/partiționa datele într-un mod mai simplu.

Subiect de conținut:

Citirea JSON în PySpark DataFrame folosind Pandas.read_json()







Citirea JSON în PySpark DataFrame folosind Spark.read.json()



Citirea JSON în PySpark DataFrame folosind PySpark SQL



În acest tutorial, vom analiza cum să citim JSON în PySpark DataFrame folosind pandas.read_json(), spark.read.json() și spark.sql. În toate scenariile, ne vom uita la diferitele exemple luând în considerare diferitele formate JSON.





Instalați biblioteca PySpark înainte de a implementa următoarele exemple.

pip install pyspark

După instalarea cu succes, puteți vedea rezultatul după cum urmează:



Citirea JSON în PySpark DataFrame folosind Pandas.read_json()

În PySpark, metoda createDataFrame() este utilizată pentru a crea direct DataFrame. Aici, trebuie doar să trecem fișierul/calea JSON către fișierul JSON prin metoda pandas.read_json(). Această metodă read_json() preia numele fișierului/calea care este disponibilă în modulul Pandas. Acesta este motivul pentru care este necesar să importați și să utilizați modulul Pandas.

Sintaxă:

spark_app.createDataFrame(pandas.read_json( „nume_fișier.json” ))

Exemplu:

Să creăm un fișier JSON numit „student_skill.json” care conține 2 înregistrări. Aici, cheile/coloanele sunt „Student 1” și „Student 2”. Rândurile sunt nume, vârstă, abilitate1 și abilitate2.

import pyspark

importa panda

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# Folosind pandas.read_json()

candidate_skills = linuxhint_spark_app.createDataFrame(pandas.read_json( „student_skill.json” ))

candidate_skills.show()

Ieșire:

Putem vedea că datele JSON sunt convertite în PySpark DataFrame cu coloane și rânduri specificate.

2. Citirea JSON în PySpark DataFrame folosind Spark.read.json()

Read.json() este o metodă similară cu read_json() în Pandas. Aici, read.json() ia o cale către JSON sau direct către fișierul JSON și îl încarcă direct în PySpark DataFrame. Nu este nevoie să utilizați metoda createDataFrame() în acest scenariu. Dacă doriți să citiți mai multe fișiere JSON simultan, trebuie să trecem o listă de nume de fișiere JSON printr-o listă care este separată prin virgulă. Toate înregistrările JSON sunt stocate într-un singur DataFrame.

Sintaxă:

Fișier unic - spark_app.read.json( „nume_fișier.json” )

Mai multe fișiere - spark_app.read.json([ „fișier1.json” , „fișier2.json” ,...])

Scenariul 1: Citiți JSON având o singură linie

Dacă fișierul dvs. JSON este în formatele record1, record2, record3... (o singură linie), îl putem numi ca JSON cu linii unice. Spark procesează aceste înregistrări și le stochează în PySpark DataFrame sub formă de rânduri. Fiecare înregistrare este un rând în PySpark DataFrame.

Să creăm un fișier JSON numit „candidate_skills.json” care conține 3 înregistrări. Citiți acest JSON în PySpark DataFrame.

import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# Citiți candidate_skills.json în PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.json( „candidate_skills.json” )

candidate_skills.show()

Ieșire:

Putem vedea că datele JSON sunt convertite în PySpark DataFrame cu înregistrări și nume de coloane specificate.

Scenariul 2: Citiți JSON având mai multe linii

Dacă fișierul dvs. JSON are mai multe linii, trebuie să utilizați metoda read.option().json() pentru a transmite parametrul multilinie care trebuie setat la true. Acest lucru ne permite să încărcăm JSON cu mai multe linii în PySpark DataFrame.

read.option( 'multilinie' , 'Adevărat' ).json( „nume_fișier.json” )

Să creăm un fișier JSON numit „multi.json” care conține 3 înregistrări. Citiți acest JSON în PySpark DataFrame.

import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# Citiți multi.json (având mai multe linii) în PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.option( 'multilinie' , 'Adevărat' ).json( „multi.json” )

candidate_skills.show()

Ieșire:

Scenariul 3: Citiți mai multe JSON

Am discutat deja la faza de început a acestui tutorial cu privire la mai multe fișiere JSON. Dacă doriți să citiți mai multe fișiere JSON simultan și să le stocați într-un singur PySpark DataFrame, trebuie să transmitem o listă de nume de fișiere metodei read.json().

Să creăm două fișiere JSON numite „candidate_skills.json” și „candidate_skills2.json” și să le încărcăm în PySpark DataFrame.

Fișierul „candidate_skills.json” conține trei înregistrări.

Fișierul „candidate_skill2.json” conține doar o singură înregistrare.

import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# Citiți fișierele candidate_skills și candidate_skills2 odată în PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.json([ „candidate_skills.json” , „candidate_skills2.json” ])

candidate_skills.show()

Ieșire:

În cele din urmă, DataFrame deține patru înregistrări. Primele trei înregistrări aparțin primului JSON, iar ultimele înregistrări aparțin celui de-al doilea JSON.

Citirea JSON în PySpark DataFrame folosind Spark.read.json()

Read.json() este o metodă similară cu read_json() în Pandas. Aici, read.json() ia o cale către JSON sau direct către fișierul JSON și o încarcă direct în PySpark DataFrame. Nu este nevoie să utilizați metoda createDataFrame() în acest scenariu. Dacă doriți să citiți mai multe fișiere JSON simultan, trebuie să trecem o listă de nume de fișiere JSON printr-o listă care este separată prin virgulă. Toate înregistrările JSON sunt stocate într-un singur DataFrame.

Sintaxă:

Fișier unic - spark_app.read.json( „nume_fișier.json” )

Mai multe fișiere - spark_app.read.json([ „fișier1.json” , „fișier2.json” ,...])

Scenariul 1: Citiți JSON având o singură linie

Dacă fișierul dvs. JSON este în formatul record1, record2, record3... (o singură linie), îl putem numi ca JSON cu linii unice. Spark procesează aceste înregistrări și le stochează în PySpark DataFrame sub formă de rânduri. Fiecare înregistrare este un rând în PySpark DataFrame.

Să creăm un fișier JSON numit „candidate_skills.json” care conține 3 înregistrări. Citiți acest JSON în PySpark DataFrame.

import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# Citiți candidate_skills.json în PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.json( „candidate_skills.json” )

candidate_skills.show()

Ieșire:

Putem vedea că datele JSON sunt convertite în PySpark DataFrame cu înregistrări și nume de coloane specificate.

Citirea JSON în PySpark DataFrame folosind PySpark SQL

Poate fi posibil să creați o vizualizare temporară a datelor noastre JSON folosind PySpark SQL. Direct, putem furniza JSON în momentul creării vizualizării temporare. Uită-te la următoarea sintaxă. După aceea, putem folosi comanda SELECT pentru a afișa PySpark DataFrame.

Sintaxă:

spark_app.sql( „CREAȚI VIEW_NAME TEMPORARĂ FOLOSIND OPȚIUNI json (calea „nume_fișier.json”)” )

Aici, „VIEW_NAME” este vizualizarea datelor JSON, iar „file_name” este numele fișierului JSON.

Exemplul 1:

Luați în considerare fișierul JSON care este utilizat în exemplele anterioare – „candidate_skills.json”. Selectați toate rândurile din DataFrame folosind SELECT cu operatorul „*”. Aici, * selectează toate coloanele din PySpark DataFrame.

import pyspark

importa panda

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

# Folosind spark.sql pentru a crea VIEW din JSON

candidate_skills = linuxhint_spark_app.sql( „CREAȚI VEDERE TEMPORARĂ Candidate_data FOLOSIND OPȚIUNI JSON (calea „candidate_skills.json”)” )

# Utilizați interogarea SELECT pentru a selecta toate înregistrările din Candidate_data.

linuxhint_spark_app.sql( „SELECT * din Candidate_data” ).spectacol()

Ieșire:

Numărul total de înregistrări din PySpark DataFrame (citit din JSON) este 3.

Exemplul 2:

Acum, filtrați înregistrările din PySpark DataFrame în funcție de coloana de vârstă. Utilizați operatorul „mai mare decât” pentru vârsta pentru a obține rândurile cu o vârstă mai mare de 22 de ani.

# Utilizați interogarea SELECT pentru a selecta înregistrările cu vârsta > 22.

linuxhint_spark_app.sql( „SELECT * din Candidate_data unde vârsta>22” ).spectacol()

Ieșire:

Există o singură înregistrare în PySpark DataFrame cu o vârstă mai mare de 22 de ani.

Concluzie

Am învățat cele trei moduri diferite de a citi JSON în PySpark DataFrame. Mai întâi, am învățat cum să folosim metoda read_json() disponibilă în modulul Pandas pentru a citi JSON în PySpark DataFrame. Apoi, am învățat cum să citim fișierele JSON cu o singură linie/mai multe linii folosind metoda spark.read.json() cu opțiunea(). Pentru a citi mai multe fișiere JSON în același timp, trebuie să transmitem acestei metode o listă de nume de fișiere. Folosind PySpark SQL, fișierul JSON este citit în vizualizarea temporară și DataFrame este afișat folosind interogarea SELECT.