PySpark Pandas_Udf()

Pyspark Pandas Udf



PySpark DataFrame'i teisendamine on võimalik funktsiooni pandas_udf() abil. See on kasutaja määratletud funktsioon, mida rakendatakse PySpark DataFrame'i noolega. Vektoriseeritud toiminguid saame teha pandas_udf() abil. Seda saab rakendada, läbides selle funktsiooni dekoraatorina. Sukeldume sellesse juhendisse süntaksi, parameetrite ja erinevate näidete tundmaõppimiseks.

Sisu teema:

Kui soovite teada PySpark DataFrame'i ja mooduli installimise kohta, lugege seda artiklit .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () on saadaval PySparki moodulis sql.functions, mida saab importida märksõna 'from' abil. Seda kasutatakse vektoriseeritud toimingute tegemiseks meie PySpark DataFrame'is. Seda funktsiooni rakendatakse nagu dekoraatorit, edastades kolm parameetrit. Pärast seda saame luua kasutaja määratud funktsiooni, mis tagastab andmed vektorvormingus (nagu me kasutame selleks seeriat/NumPy), kasutades noolt. Selle funktsiooni raames saame tulemuse tagastada.



Struktuur ja süntaks:



Kõigepealt vaatame selle funktsiooni struktuuri ja süntaksit:

@pandas_udf(andmetüüp)
def funktsiooni_nimi(operatsioon) -> convert_format:
tagastusavaldus

Siin on funktsiooni_nimi meie määratletud funktsiooni nimi. Andmetüüp määrab andmetüübi, mille see funktsioon tagastab. Tulemuse saame tagastada märksõna 'tagasi' abil. Kõik toimingud tehakse funktsiooni sees noolemääranguga.





Pandas_udf (funktsioon ja tagastamise tüüp)

  1. Esimene parameeter on kasutaja määratud funktsioon, mis sellele edastatakse.
  2. Teist parameetrit kasutatakse funktsiooni tagastatava andmetüübi määramiseks.

Andmed:

Kogu selles juhendis kasutame demonstreerimiseks ainult ühte PySpark DataFrame'i. Selles PySpark DataFrame'is rakendatakse kõiki meie määratletud kasutaja määratud funktsioone. Veenduge, et loote selle DataFrame'i oma keskkonnas esmalt pärast PySparki installimist.



import pyspark

pyspark.sql-st importige SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

pyspark.sql.functions importida pandas_udf

pyspark.sql.types impordist *

importida pandasid pandadeks

# köögiviljade üksikasju

köögivili =[{ 'tüüp' : 'köögivili' , 'nimi' : 'tomat' , 'locate_country' : 'USA' , 'kogus' : 800 },

{ 'tüüp' : 'puuvili' , 'nimi' : 'banaan' , 'locate_country' : 'HIINA' , 'kogus' : kakskümmend },

{ 'tüüp' : 'köögivili' , 'nimi' : 'tomat' , 'locate_country' : 'USA' , 'kogus' : 800 },

{ 'tüüp' : 'köögivili' , 'nimi' : 'Mango' , 'locate_country' : 'JAAPAN' , 'kogus' : 0 },

{ 'tüüp' : 'puuvili' , 'nimi' : 'sidrun' , 'locate_country' : 'INDIA' , 'kogus' : 1700 },

{ 'tüüp' : 'köögivili' , 'nimi' : 'tomat' , 'locate_country' : 'USA' , 'kogus' : 1200 },

{ 'tüüp' : 'köögivili' , 'nimi' : 'Mango' , 'locate_country' : 'JAAPAN' , 'kogus' : 0 },

{ 'tüüp' : 'puuvili' , 'nimi' : 'sidrun' , 'locate_country' : 'INDIA' , 'kogus' : 0 }

]

# looge ülaltoodud andmete põhjal turu andmeraamistik

market_df = linuxhint_spark_app.createDataFrame(vegetable)

market_df.show()

Väljund:

Siin loome selle DataFrame'i 4 veeru ja 8 reaga. Nüüd kasutame kasutaja määratud funktsioonide loomiseks ja nendele veergudele rakendamiseks pandas_udf().

Pandas_udf() erinevate andmetüüpidega

Selle stsenaariumi korral loome pandas_udf() abil mõned kasutaja määratud funktsioonid ja rakendame need veergudele ning kuvame tulemused select() meetodi abil. Igal juhul kasutame vektoroperatsioonide sooritamisel pandas.Series. See käsitleb veeru väärtusi ühemõõtmelise massiivina ja toiming rakendatakse veerule. Dekoraatoris endas määrame funktsiooni tagastustüübi.

Näide 1: Pandas_udf() stringitüübiga

Siin loome kaks kasutaja määratud funktsiooni stringi tagastustüübiga, et teisendada stringitüübi veeru väärtused suur- ja väiketähtedeks. Lõpuks rakendame neid funktsioone veergudele „type” ja „locate_country”.

# Teisendage tüübiveerg suurtähtedeks funktsiooniga pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

tagasta i.str.upper()

# Teisendage locate_country veerg väiketähtedeks funktsiooniga pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

tagasta i.str.lower()

# Kuvage veerud, kasutades select()

market_df.select( 'tüüp' ,type_suurtähe( 'tüüp' ), 'locate_country' ,
riik_väike_täht( 'locate_country' )).show()

Väljund:

Selgitus:

Funktsioon StringType() on saadaval moodulis pyspark.sql.types. Importisime selle mooduli juba PySpark DataFrame'i loomise ajal.

  1. Esiteks tagastab UDF (kasutaja määratud funktsioon) stringid suurtähtedega, kasutades funktsiooni str.upper(). Str.upper() on saadaval Series Data Structure'is (kuna me teisendame seeriateks funktsiooni sees oleva noolega), mis teisendab antud stringi suurtähtedeks. Lõpuks rakendatakse seda funktsiooni veerule 'tüüp', mis on määratud meetodi select() sees. Varem olid tüübiveerus kõik stringid väiketähtedega. Nüüd on need muudetud suurtähtedeks.
  2. Teiseks tagastab UDF stringid suurtähtedega, kasutades funktsiooni str.lower(). Str.lower() on saadaval seeria andmestruktuuris, mis teisendab antud stringi väiketähtedeks. Lõpuks rakendatakse seda funktsiooni veerule 'tüüp', mis on määratud meetodi select() sees. Varem olid tüübiveerus kõik stringid suurtähtedega. Nüüd on need muudetud väiketähtedeks.

Näide 2: Pandas_udf() täisarvu tüübiga

Loome UDF-i, mis teisendab PySpark DataFrame'i täisarvu veeru Pandase seeriaks ja lisame igale väärtusele 100. Edastage veerg 'kogus' sellele funktsioonile meetodi select() sees.

# Lisage 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

tagasta i+ 100

# Edastage koguse veerg ülaltoodud funktsioonile ja kuvage.

market_df.select( 'kogus' ,add_100( 'kogus' )).show()

Väljund:

Selgitus:

UDF-is kordame kõiki väärtusi ja teisendame need seeriateks. Pärast seda lisame seeria igale väärtusele 100. Lõpuks edastame sellele funktsioonile veeru “kogus” ja näeme, et kõikidele väärtustele lisatakse 100.

Pandas_udf() erinevate andmetüüpidega, kasutades Groupby() ja Agg()

Vaatame näiteid UDF-i edastamiseks koondatud veergudesse. Siin rühmitatakse veeru väärtused kõigepealt funktsiooni groupby() abil ja liitmine toimub funktsiooni agg() abil. Me edastame oma UDF-i selle koondfunktsiooni sees.

Süntaks:

pyspark_dataframe_object.groupby( 'rühmitamise_veerg' ).agg(UDF
(pyspark_dataframe_object[ 'veerg' ]))

Siin rühmitatakse kõigepealt väärtused rühmitamise veerus. Seejärel tehakse iga rühmitatud andmete kohta meie UDF-i suhtes liitmine.

Näide 1: Pandas_udf() koos koondkeskmisega()

Siin loome kasutaja määratletud funktsiooni tagastustüübi ujukiga. Funktsiooni sees arvutame keskmise, kasutades funktsiooni mean(). See UDF edastatakse veergu „kogus”, et saada iga tüübi keskmine kogus.

# tagastab keskmise/keskmise

@pandas_udf( 'hõljuda' )

def medium_function(i: panda.Series) -> float:

return i.mean()

# Edastage koguse veerg funktsioonile, rühmitades tüübiveeru.

market_df.groupby( 'tüüp' ).agg(average_function(market_df[ 'kogus' ])).show()

Väljund:

Rühmitame veerus „tüüp” olevate elementide alusel. Moodustatakse kaks rühma - 'puuvili' ja 'köögivili'. Iga rühma kohta arvutatakse ja tagastatakse keskmine.

Näide 2: Pandas_udf() koos Aggregate Max() ja Min()

Siin loome kaks kasutaja määratud funktsiooni täisarvu (int) tagastustüübiga. Esimene UDF tagastab minimaalse väärtuse ja teine ​​UDF maksimaalse väärtuse.

# pandas_udf, mis tagastavad minimaalse väärtuse

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

tagasta i.min()

# pandas_udf, mis tagastavad maksimaalse väärtuse

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

tagasta i.max()

# Edastage koguse veerg min_ pandas_udf, rühmitades locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'kogus' ])).show()

# Edastage koguse veerg max_ pandas_udf, rühmitades asukoha_riik.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'kogus' ])).show()

Väljund:

Minimaalsete ja maksimaalsete väärtuste tagastamiseks kasutame UDF-ide tagastustüübis funktsioone min () ja max (). Nüüd rühmitame andmed veergu „locate_country”. Moodustatakse neli rühma (“HIINA”, “INDIA”, “JAAPAN”, “USA”). Iga rühma kohta tagastame maksimaalse koguse. Samamoodi tagastame minimaalse koguse.

Järeldus

Põhimõtteliselt kasutatakse pandas_udf () vektoriseeritud toimingute tegemiseks meie PySpark DataFrame'is. Oleme näinud, kuidas luua pandas_udf() ja rakendada seda PySpark DataFrame'is. Parema mõistmise huvides arutasime erinevaid näiteid, võttes arvesse kõiki andmetüüpe (string, ujuk ja täisarv). Funktsiooni agg() kaudu on võimalik kasutada pandas_udf() koos groupby()-ga.