Kuidas PySparkis tabeliandmeid lugeda ja kirjutada

Kuidas Pysparkis Tabeliandmeid Lugeda Ja Kirjutada



Andmetöötlus PySparkis on kiirem, kui andmed laaditakse tabeli kujul. Sellega, kasutades SQL-i avaldisi, on töötlemine kiire. Seega on PySpark DataFrame/RDD teisendamine tabeliks enne töötlemiseks saatmist parem lähenemine. Täna näeme, kuidas lugeda tabeliandmeid PySpark DataFrame'i, kirjutada PySpark DataFrame tabelisse ja lisada olemasolevasse tabelisse sisseehitatud funktsioonide abil uus DataFrame. Lähme!

Pyspark.sql.DataFrameWriter.saveAsTable()

Esiteks näeme, kuidas kirjutada olemasolev PySpark DataFrame tabelisse funktsiooni write.saveAsTable() abil. DataFrame'i tabelisse kirjutamiseks on vaja tabeli nime ja muid valikulisi parameetreid, nagu mode, partitionBy jne. Seda hoitakse parketifailina.

Süntaks:







dataframe_obj.write.saveAsTable(tee/tabeli_nimi,režiim,partitionBy,…)
  1. Tabeli_nimi on tabeli nimi, mis luuakse failist dataframe_obj.
  2. Tabeli andmeid saame lisada/üle kirjutada režiimi parameetri abil.
  3. PartitionBy kasutab nendes veergudes olevate väärtuste põhjal partitsioonide loomiseks üksikuid/mitu veergu.

Näide 1:

Looge 5 rea ja 4 veeruga PySpark DataFrame. Kirjutage see andmeraam tabelisse nimega 'Agri_Table1'.



import pyspark

pyspark.sql-st importige SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# põllumajandusandmed 5 rea ja 5 veeruga

agri =[{ 'Soil_Type' : 'must' , „Niisutus_kättesaadavus” : 'ei' , 'Aakrid' : 2500 , 'Mulla_status' : 'Kuiv' ,
'Riik' : 'USA' },

{ 'Soil_Type' : 'must' , „Niisutus_kättesaadavus” : 'jah' , 'Aakrid' : 3500 , 'Mulla_status' : 'Märg' ,
'Riik' : 'India' },

{ 'Soil_Type' : 'Punane' , „Niisutus_kättesaadavus” : 'jah' , 'Aakrid' : 210 , 'Mulla_status' : 'Kuiv' ,
'Riik' : 'UK' },

{ 'Soil_Type' : 'muu' , „Niisutus_kättesaadavus” : 'ei' , 'Aakrid' : 1000 , 'Mulla_status' : 'Märg' ,
'Riik' : 'USA' },

{ 'Soil_Type' : 'liiv' , „Niisutus_kättesaadavus” : 'ei' , 'Aakrid' : 500 , 'Mulla_status' : 'Kuiv' ,
'Riik' : 'India' }]



# looge ülaltoodud andmetest andmeraam

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Kirjutage ülaltoodud DataFrame tabelisse.

agri_df.coalesce( 1 ).write.saveAsTable( 'Põllumajandus_tabel1' )

Väljund:







Näeme, et eelmise PySpark Dataga luuakse üks parketifail.



Näide 2:

Võtke arvesse eelmist DataFrame'i ja kirjutage tabelisse 'Agri_Table2', jagades kirjed veerus 'Riik' olevate väärtuste alusel.

# Kirjutage ülaltoodud DataFrame parameetriga partitionBy tabelisse

agri_df.write.saveAsTable( 'Põllumajandus_tabel2' ,partitionBy=[ 'Riik' ])

Väljund:

Veerus „Riik” on kolm ainulaadset väärtust – „India”, „UK” ja „USA”. Seega luuakse kolm partitsiooni. Igal vaheseinal on parketifailid.

Pyspark.sql.DataFrameReader.table()

Laadime tabeli PySpark DataFrame'i funktsiooni spark.read.table() abil. See võtab ainult ühe parameetri, mis on tee/tabeli nimi. See laadib tabeli otse PySpark DataFrame'i ja kõiki PySpark DataFrame'ile rakendatud SQL-funktsioone saab rakendada ka sellele laaditud DataFrame'ile.

Süntaks:

spark_app.read.table(path/'tabeli_nimi')

Selle stsenaariumi korral kasutame eelmist tabelit, mis loodi PySpark DataFrame'ist. Veenduge, et peate oma keskkonnas juurutama eelmise stsenaariumi koodilõigud.

Näide:

Laadige tabel 'Agri_Table1' DataFrame'i nimega 'loaded_data'.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

Väljund:

Näeme, et tabel on laaditud PySpark DataFrame'i.

SQL päringute täitmine

Nüüd täidame mõned SQL-päringud laaditud DataFrame'is, kasutades funktsiooni spark.sql().

# Kasutage ülaltoodud tabeli kõigi veergude kuvamiseks käsku SELECT.

linuxhint_spark_app.sql( 'SELECT * alates Agri_Table1' ).show()

# WHERE klausel

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Kuiv'' ).show()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).show()

Väljund:

  1. Esimene päring kuvab kõik DataFrame'i veerud ja kirjed.
  2. Teine päring kuvab kirjed veeru „Soil_status” alusel. “Dry” elemendiga kirjeid on ainult kolm.
  3. Viimane päring tagastab kaks kirjet 'Acres', mis on suuremad kui 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Funktsiooni insertInto() abil saame lisada DataFrame'i olemasolevasse tabelisse. Seda funktsiooni saame kasutada koos valikuga selectExpr() veergude nimede määratlemiseks ja seejärel tabelisse sisestamiseks. See funktsioon võtab parameetrina ka tabeli Name.

Süntaks:

DataFrame_obj.write.insertInto('Tabeli_nimi')

Selle stsenaariumi korral kasutame eelmist tabelit, mis loodi PySpark DataFrame'ist. Veenduge, et peate oma keskkonnas juurutama eelmise stsenaariumi koodilõigud.

Näide:

Looge kahe kirjega uus DataFrame ja sisestage need tabelisse 'Agri_Table1'.

import pyspark

pyspark.sql-st importige SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# põllumajandusandmed 2 reaga

agri =[{ 'Soil_Type' : 'liiv' , 'Niisutus_kättesaadavus' : 'ei' , 'Aakrid' : 2500 , 'Mulla_status' : 'Kuiv' ,
'Riik' : 'USA' },

{ 'Soil_Type' : 'liiv' , 'Niisutus_kättesaadavus' : 'ei' , 'Aakrid' : 1200 , 'Mulla_status' : 'Märg' ,
'Riik' : 'Jaapan' }]

# looge ülaltoodud andmete põhjal andmeraam

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Aakrid' , 'Riik' , 'Niisutus_kättesaadavus' , 'Mulla_tüüp' ,
'Mulla_staatus' ).write.insertInto( 'Põllumajandus_tabel1' )

# Kuva lõplik Agri_Table1

linuxhint_spark_app.sql( 'SELECT * alates Agri_Table1' ).show()

Väljund:

Nüüd on DataFrame'is olevate ridade koguarv 7.

Järeldus

Nüüd saate aru, kuidas PySpark DataFrame'i tabelisse kirjutada, kasutades funktsiooni write.saveAsTable(). See võtab tabeli nime ja muud valikulised parameetrid. Seejärel laadisime selle tabeli PySpark DataFrame'i funktsiooni spark.read.table() abil. See võtab ainult ühe parameetri, mis on tee/tabeli nimi. Kui soovite lisada uue DataFrame'i olemasolevasse tabelisse, kasutage funktsiooni insertInto().