PySpark Pandas_Udf()

Pyspark Pandas Udf



Transformarea PySpark DataFrame este posibilă folosind funcția pandas_udf(). Este o funcție definită de utilizator care este aplicată pe PySpark DataFrame cu săgeată. Putem efectua operațiile vectorizate folosind pandas_udf(). Poate fi implementat prin trecerea acestei funcții ca decorator. Să ne aprofundăm în acest ghid pentru a cunoaște sintaxa, parametrii și diferitele exemple.

Subiect de conținut:

Dacă doriți să aflați despre instalarea PySpark DataFrame și modulul, parcurgeți acest lucru articol .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () este disponibil în modulul sql.functions din PySpark, care poate fi importat folosind cuvântul cheie „de la”. Este folosit pentru a efectua operațiunile vectorizate pe PySpark DataFrame. Această funcție este implementată ca un decorator prin trecerea a trei parametri. După aceea, putem crea o funcție definită de utilizator care returnează datele în format vectorial (cum ar fi folosim serie/NumPy pentru aceasta) folosind o săgeată. În cadrul acestei funcții, putem returna rezultatul.



Structură și sintaxă:



Mai întâi, să ne uităm la structura și sintaxa acestei funcții:

@pandas_udf(tip de date)
def nume_funcție (operație) -> format_conversie:
declarație de returnare

Aici, function_name este numele functiei definite de noi. Tipul de date specifică tipul de date care este returnat de această funcție. Putem returna rezultatul folosind cuvântul cheie „return”. Toate operațiile sunt efectuate în interiorul funcției cu atribuirea săgeții.





Pandas_udf (Funcție și ReturnType)

  1. Primul parametru este funcția definită de utilizator care îi este transmisă.
  2. Al doilea parametru este utilizat pentru a specifica tipul de date returnate din funcție.

Date:

În întregul ghid, folosim un singur PySpark DataFrame pentru demonstrație. Toate funcțiile definite de utilizator pe care le definim sunt aplicate pe acest PySpark DataFrame. Asigurați-vă că creați acest DataFrame în mediul dvs. mai întâi după instalarea PySpark.



import pyspark

din pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Sugestie Linux” ).getOrCreate()

din pyspark.sql.functions import pandas_udf

din importul pyspark.sql.types *

importa panda ca panda

# detalii vegetale

legume =[{ 'tip' : 'vegetal' , 'Nume' : 'roșie' , „locate_country” : 'STATELE UNITE ALE AMERICII' , 'cantitate' : 800 },

{ 'tip' : 'fructe' , 'Nume' : 'banană' , „locate_country” : 'CHINA' , 'cantitate' : douăzeci },

{ 'tip' : 'vegetal' , 'Nume' : 'roșie' , „locate_country” : 'STATELE UNITE ALE AMERICII' , 'cantitate' : 800 },

{ 'tip' : 'vegetal' , 'Nume' : 'Mango' , „locate_country” : 'JAPONIA' , 'cantitate' : 0 },

{ 'tip' : 'fructe' , 'Nume' : 'lămâie' , „locate_country” : 'INDIA' , 'cantitate' : 1700 },

{ 'tip' : 'vegetal' , 'Nume' : 'roșie' , „locate_country” : 'STATELE UNITE ALE AMERICII' , 'cantitate' : 1200 },

{ 'tip' : 'vegetal' , 'Nume' : 'Mango' , „locate_country” : 'JAPONIA' , 'cantitate' : 0 },

{ 'tip' : 'fructe' , 'Nume' : 'lămâie' , „locate_country” : 'INDIA' , 'cantitate' : 0 }

]

# creați cadrul de date de piață din datele de mai sus

market_df = linuxhint_spark_app.createDataFrame(legumă)

market_df.show()

Ieșire:

Aici, creăm acest DataFrame cu 4 coloane și 8 rânduri. Acum, folosim pandas_udf() pentru a crea funcțiile definite de utilizator și pentru a le aplica acestor coloane.

Pandas_udf() cu diferite tipuri de date

În acest scenariu, creăm câteva funcții definite de utilizator cu pandas_udf() și le aplicăm pe coloane și afișăm rezultatele folosind metoda select(). În fiecare caz, folosim panda.Series pe măsură ce efectuăm operațiile vectorizate. Aceasta consideră valorile coloanei ca o matrice unidimensională și operația este aplicată pe coloană. În decoratorul în sine, specificăm tipul de returnare a funcției.

Exemplul 1: Pandas_udf() cu tip String

Aici, creăm două funcții definite de utilizator cu tipul șir returnat pentru a converti valorile coloanei tip șir în majuscule și litere mici. În cele din urmă, aplicăm aceste funcții pe coloanele „type” și „locate_country”.

# Convertiți coloana de tip în majuscule cu pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

return i.str.upper()

# Convertiți coloana locate_country în litere mici cu pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Afișați coloanele folosind select()

market_df.select( 'tip' ,type_upper_case( 'tip' ), 'locate_country' ,
tara_minuscule( 'locate_country' )).spectacol()

Ieșire:

Explicaţie:

Funcția StringType() este disponibilă în modulul pyspark.sql.types. Am importat deja acest modul în timp ce am creat PySpark DataFrame.

  1. În primul rând, UDF (funcția definită de utilizator) returnează șirurile în majuscule folosind funcția str.upper(). str.upper() este disponibil în Structura de date Series (deoarece convertim în serie cu o săgeată în interiorul funcției) care convertește șirul dat în majuscule. În cele din urmă, această funcție este aplicată coloanei „type” care este specificată în cadrul metodei select(). Anterior, toate șirurile din coloana de tip sunt scrise cu litere mici. Acum, acestea sunt modificate cu majuscule.
  2. În al doilea rând, UDF returnează șirurile în majuscule folosind funcția str.lower(). str.lower() este disponibil în Structura de date în serie, care convertește șirul dat în litere mici. În cele din urmă, această funcție este aplicată coloanei „type” care este specificată în cadrul metodei select(). Anterior, toate șirurile din coloana de tip sunt scrise cu majuscule. Acum, acestea sunt modificate cu litere mici.

Exemplul 2: Pandas_udf() cu tip întreg

Să creăm un UDF care convertește coloana PySpark DataFrame întreg în seria Pandas și să adăugăm 100 la fiecare valoare. Treceți coloana „cantitate” acestei funcție în cadrul metodei select().

# Adăugați 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

returnează i+ 100

# Treceți coloana cantității la funcția de mai sus și afișați.

market_df.select( 'cantitate' ,add_100( 'cantitate' )).spectacol()

Ieșire:

Explicaţie:

În interiorul UDF, repetăm ​​toate valorile și le convertim în Series. După aceea, adăugăm 100 la fiecare valoare din serie. În cele din urmă, trecem coloana „cantitate” acestei funcții și putem vedea că la toate valorile se adaugă 100.

Pandas_udf() cu diferite tipuri de date folosind Groupby() și Agg()

Să ne uităm la exemplele pentru a trece UDF-ul coloanelor agregate. Aici, valorile coloanei sunt grupate mai întâi utilizând funcția groupby(), iar agregarea se face folosind funcția agg(). Trecem UDF-ul nostru în această funcție de agregare.

Sintaxă:

pyspark_dataframe_object.groupby( „grupare_coloană” ).agg(UDF
(pyspark_dataframe_object[ 'coloană' ]))

Aici, valorile din coloana de grupare sunt grupate mai întâi. Apoi, agregarea se face pe fiecare dată grupată în raport cu UDF-ul nostru.

Exemplul 1: Pandas_udf() cu Media agregată()

Aici, creăm o funcție definită de utilizator cu un float de tip return. În interiorul funcției, calculăm media folosind funcția mean(). Acest UDF este trecut în coloana „cantitate” pentru a obține cantitatea medie pentru fiecare tip.

# returnează media/media

@pandas_udf( 'pluti' )

def average_function(i: panda.Series) -> float:

return i.mean()

# Treceți coloana de cantitate la funcție prin gruparea coloanei de tip.

market_df.groupby( 'tip' ).agg(funcție_medie(market_df[ 'cantitate' ])).spectacol()

Ieșire:

Grupăm pe baza elementelor din coloana „tip”. Se formează două grupuri - „fructe” și „legume”. Pentru fiecare grup, se calculează media și se returnează.

Exemplul 2: Pandas_udf() cu Agregate Max() și Min()

Aici, creăm două funcții definite de utilizator cu tipul de returnare întreg (int). Primul UDF returnează valoarea minimă, iar al doilea UDF returnează valoarea maximă.

# pandas_udf care returnează valoarea minimă

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

returnează i.min()

# pandas_udf care returnează valoarea maximă

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

returnează i.max()

# Transmite coloana de cantitate la min_ pandas_udf prin gruparea locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'cantitate' ])).spectacol()

# Transmite coloana de cantitate la max_ pandas_udf prin gruparea locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'cantitate' ])).spectacol()

Ieșire:

Pentru a returna valorile minime și maxime, utilizăm funcțiile min() și max() în tipul de returnare al UDF-urilor. Acum, grupăm datele în coloana „locate_country”. Se formează patru grupuri („CHINA”, „INDIA”, „JAPONIA”, „SUA”). Pentru fiecare grupa returnam cantitatea maxima. În mod similar, returnăm cantitatea minimă.

Concluzie

Practic, pandas_udf () este folosit pentru a efectua operațiunile vectorizate pe PySpark DataFrame. Am văzut cum să creăm pandas_udf() și să îl aplicăm în PySpark DataFrame. Pentru o mai bună înțelegere, am discutat diferitele exemple luând în considerare toate tipurile de date (șir, float și întreg). Este posibil să utilizați pandas_udf() cu groupby() prin intermediul funcției agg().