Pyspark.sql.DataFrameReader.csv()
Această metodă este utilizată pentru a citi datele din fișierul/fișierele CSV și pentru a le stoca în PySpark DataFrame. Este nevoie de opțiuni în timp ce citește CSV în DataFrame. Vom discuta diferitele opțiuni cu exemple în detaliu. În timp ce treceți mai mult de un fișier CSV, este important să treceți numele fișierelor cu extensie într-o listă care este separată de operatorul virgulă. Dacă citiți un singur fișier CSV, nu este necesar să furnizați numele fișierului într-o listă.
Sintaxă:
Fișier unic - spark_app.read.csv(‘file.csv’, Opțiuni …)
Mai multe fișiere – spark_app.read.csv([‘file1.csv’,’file2.csv’,…],opțiuni…)
De asemenea, poate fi posibilă separarea opțiunilor și a numelor fișierelor.
Un singur dosar – spark_app.read.options(opțiuni…).csv(‘fișier.csv’)
Mai multe fișiere – spark_app.read.options(opțiuni…).csv([‘file1.csv’,’file2.csv’,…])
Instalați biblioteca PySpark înainte de a implementa următoarele exemple.
pip install pyspark
După instalarea cu succes, puteți vedea rezultatul după cum urmează:
Scenariul 1: citirea antetului fișierului CSV
Să creăm un fișier CSV numit „person_skill.csv” cu 5 înregistrări, care este afișat în cele ce urmează și să îl încărcăm în PySpark DataFrame:
Parametrul antet este utilizat pentru a specifica numele coloanelor din PySpark DataFrame. Este nevoie de o valoare booleană. Dacă este „True”, numele de coloane reale care există în fișierul CSV sunt specificate în DataFrame, În caz contrar, sunt specificate c0, c1, c2... iar numele de coloane reale vor fi un rând. Cel mai bine este să setați parametrul antetului la true.
Exemplul 1: Antet = Adevărat
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# Încărcați csv numit - person_skill.csv în competențe cu etichete de coloană cu antet
competențe = linuxhint_spark_app.read.csv( „person_skill.csv” , antet =Adevărat)
# Afișează DataFrame
skills.show()
Ieșire:
Explicaţie:
Putem vedea că PySpark DataFrame este creat din fișierul CSV cu coloane și rânduri specificate.
Utilizați următoarea comandă pentru a verifica coloanele:
aptitudini.coloane
Exemplul 2: Antet = Fals
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# Încărcați csv numit - person_skill.csv în competențe cu etichete de coloană fără antet
competențe = linuxhint_spark_app.read.csv( „person_skill.csv” , antet = Fals)
# Afișează DataFrame
skills.show()
Ieșire:
Explicaţie:
Putem vedea că PySpark DataFrame este creat din fișierul CSV fără coloane existente.
De asemenea, coloanele existente sunt stocate ca rânduri în PySpark DataFrame.
aptitudini.coloane
Folosind Read.options.csv()
Acum, citim fișierul CSV folosind metoda read.options.csv(). Aici, trebuie să trecem opțiuni precum delimitator, antet etc. în opțiuni ca argumente și nume de fișier în csv(). Să transmitem parametrul antetului setându-l la „True”.
Scenariul 1:
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# Folosind read.options.csv()
competențe = linuxhint_spark_app.read. Opțiuni ( antet =Adevărat).csv( „person_skill.csv” )
# Afișează DataFrame
skills.show()
Ieșire:
Scenariul 2: citirea delimitatorului de fișiere CSV
Parametrul delimitator ia caracterul care este folosit pentru a separa fiecare câmp. Este nevoie de o virgulă (,) în mod implicit. Să folosim același fișier CSV care este folosit în primul scenariu și să trecem virgula (‘,’) ca delimitator.
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# Folosind read.options.csv() cu delimitator împreună cu antetul
competențe = linuxhint_spark_app.read. Opțiuni ( antet =Adevărat, delimitator= ',' ).csv( „person_skill.csv” )
# Afișează DataFrame
skills.show()
Ieșire:
Citirea mai multor fișiere
Până acum, am citit un singur fișier CSV. Să vedem cum să citim mai mult de un fișier CSV. În acest scenariu, rândurile din mai multe fișiere sunt atașate într-un singur PySpark DataFrame. Trebuie doar să trecem numele fișierelor într-o listă în cadrul metodei.
Exemplu:
Să avem următoarele fișiere CSV numite „person_skill.csv” și „person_skill2.csv” cu următoarele date:
Citiți aceste două fișiere CSV și stocați-le într-un singur PySpark DataFrame.
import pysparkdin pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()
# Încărcați 2 fișiere csv numite - person_skill.csv și person_skill2.csv în competențe cu etichete de coloană cu antet
competențe = linuxhint_spark_app.read.csv([ „person_skill.csv” , „person_skill2.csv” ],sept= ',' , antet =Adevărat)
skills.show()
Ieșire:
Explicaţie:
Primul CSV deține 6 înregistrări, iar al doilea CSV deține 3 înregistrări. Putem vedea că primul CSV este încărcat mai întâi în DataFrame. Apoi, al doilea CSV este încărcat. În cele din urmă, PySpark DataFrame deține 9 înregistrări.
Concluzie
Citirea CSV în PySpark DataFrame este destul de simplă cu metoda pyspark.sql.DataFrameReader.csv(). Poate fi posibilă trecerea parametrilor antet și delimitator acestei metode pentru a specifica coloanele și formatul. PySpark acceptă, de asemenea, citirea mai multor fișiere CSV în același timp, cu metodele furnizate împreună cu opțiunile acestora. În acest articol, am văzut exemplele luând în considerare diferite opțiuni. De asemenea, am văzut două moduri de a trece opțiunile la metodă.