Pyspark.sql.DataFrameWriter.saveAsTable()
Esiteks näeme, kuidas kirjutada olemasolev PySpark DataFrame tabelisse funktsiooni write.saveAsTable() abil. DataFrame'i tabelisse kirjutamiseks on vaja tabeli nime ja muid valikulisi parameetreid, nagu mode, partitionBy jne. Seda hoitakse parketifailina.
Süntaks:
dataframe_obj.write.saveAsTable(tee/tabeli_nimi,režiim,partitionBy,…)
- Tabeli_nimi on tabeli nimi, mis luuakse failist dataframe_obj.
- Tabeli andmeid saame lisada/üle kirjutada režiimi parameetri abil.
- PartitionBy kasutab nendes veergudes olevate väärtuste põhjal partitsioonide loomiseks üksikuid/mitu veergu.
Näide 1:
Looge 5 rea ja 4 veeruga PySpark DataFrame. Kirjutage see andmeraam tabelisse nimega 'Agri_Table1'.
import pyspark
pyspark.sql-st importige SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# põllumajandusandmed 5 rea ja 5 veeruga
agri =[{ 'Soil_Type' : 'must' , „Niisutus_kättesaadavus” : 'ei' , 'Aakrid' : 2500 , 'Mulla_status' : 'Kuiv' ,
'Riik' : 'USA' },
{ 'Soil_Type' : 'must' , „Niisutus_kättesaadavus” : 'jah' , 'Aakrid' : 3500 , 'Mulla_status' : 'Märg' ,
'Riik' : 'India' },
{ 'Soil_Type' : 'Punane' , „Niisutus_kättesaadavus” : 'jah' , 'Aakrid' : 210 , 'Mulla_status' : 'Kuiv' ,
'Riik' : 'UK' },
{ 'Soil_Type' : 'muu' , „Niisutus_kättesaadavus” : 'ei' , 'Aakrid' : 1000 , 'Mulla_status' : 'Märg' ,
'Riik' : 'USA' },
{ 'Soil_Type' : 'liiv' , „Niisutus_kättesaadavus” : 'ei' , 'Aakrid' : 500 , 'Mulla_status' : 'Kuiv' ,
'Riik' : 'India' }]
# looge ülaltoodud andmetest andmeraam
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Kirjutage ülaltoodud DataFrame tabelisse.
agri_df.coalesce( 1 ).write.saveAsTable( 'Põllumajandus_tabel1' )
Väljund:
Näeme, et eelmise PySpark Dataga luuakse üks parketifail.
Näide 2:
Võtke arvesse eelmist DataFrame'i ja kirjutage tabelisse 'Agri_Table2', jagades kirjed veerus 'Riik' olevate väärtuste alusel.
# Kirjutage ülaltoodud DataFrame parameetriga partitionBy tabelisseagri_df.write.saveAsTable( 'Põllumajandus_tabel2' ,partitionBy=[ 'Riik' ])
Väljund:
Veerus „Riik” on kolm ainulaadset väärtust – „India”, „UK” ja „USA”. Seega luuakse kolm partitsiooni. Igal vaheseinal on parketifailid.
Pyspark.sql.DataFrameReader.table()
Laadime tabeli PySpark DataFrame'i funktsiooni spark.read.table() abil. See võtab ainult ühe parameetri, mis on tee/tabeli nimi. See laadib tabeli otse PySpark DataFrame'i ja kõiki PySpark DataFrame'ile rakendatud SQL-funktsioone saab rakendada ka sellele laaditud DataFrame'ile.
Süntaks:
spark_app.read.table(path/'tabeli_nimi')Selle stsenaariumi korral kasutame eelmist tabelit, mis loodi PySpark DataFrame'ist. Veenduge, et peate oma keskkonnas juurutama eelmise stsenaariumi koodilõigud.
Näide:
Laadige tabel 'Agri_Table1' DataFrame'i nimega 'loaded_data'.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
Väljund:
Näeme, et tabel on laaditud PySpark DataFrame'i.
SQL päringute täitmine
Nüüd täidame mõned SQL-päringud laaditud DataFrame'is, kasutades funktsiooni spark.sql().
# Kasutage ülaltoodud tabeli kõigi veergude kuvamiseks käsku SELECT.linuxhint_spark_app.sql( 'SELECT * alates Agri_Table1' ).show()
# WHERE klausel
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Kuiv'' ).show()
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).show()
Väljund:
- Esimene päring kuvab kõik DataFrame'i veerud ja kirjed.
- Teine päring kuvab kirjed veeru „Soil_status” alusel. “Dry” elemendiga kirjeid on ainult kolm.
- Viimane päring tagastab kaks kirjet 'Acres', mis on suuremad kui 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Funktsiooni insertInto() abil saame lisada DataFrame'i olemasolevasse tabelisse. Seda funktsiooni saame kasutada koos valikuga selectExpr() veergude nimede määratlemiseks ja seejärel tabelisse sisestamiseks. See funktsioon võtab parameetrina ka tabeli Name.
Süntaks:
DataFrame_obj.write.insertInto('Tabeli_nimi')Selle stsenaariumi korral kasutame eelmist tabelit, mis loodi PySpark DataFrame'ist. Veenduge, et peate oma keskkonnas juurutama eelmise stsenaariumi koodilõigud.
Näide:
Looge kahe kirjega uus DataFrame ja sisestage need tabelisse 'Agri_Table1'.
import pysparkpyspark.sql-st importige SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# põllumajandusandmed 2 reaga
agri =[{ 'Soil_Type' : 'liiv' , 'Niisutus_kättesaadavus' : 'ei' , 'Aakrid' : 2500 , 'Mulla_status' : 'Kuiv' ,
'Riik' : 'USA' },
{ 'Soil_Type' : 'liiv' , 'Niisutus_kättesaadavus' : 'ei' , 'Aakrid' : 1200 , 'Mulla_status' : 'Märg' ,
'Riik' : 'Jaapan' }]
# looge ülaltoodud andmete põhjal andmeraam
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Aakrid' , 'Riik' , 'Niisutus_kättesaadavus' , 'Mulla_tüüp' ,
'Mulla_staatus' ).write.insertInto( 'Põllumajandus_tabel1' )
# Kuva lõplik Agri_Table1
linuxhint_spark_app.sql( 'SELECT * alates Agri_Table1' ).show()
Väljund:
Nüüd on DataFrame'is olevate ridade koguarv 7.
Järeldus
Nüüd saate aru, kuidas PySpark DataFrame'i tabelisse kirjutada, kasutades funktsiooni write.saveAsTable(). See võtab tabeli nime ja muud valikulised parameetrid. Seejärel laadisime selle tabeli PySpark DataFrame'i funktsiooni spark.read.table() abil. See võtab ainult ühe parameetri, mis on tee/tabeli nimi. Kui soovite lisada uue DataFrame'i olemasolevasse tabelisse, kasutage funktsiooni insertInto().