Kuidas rakendada Pythonis reaalajas andmevoogesitust

Kuidas Rakendada Pythonis Reaalajas Andmevoogesitust



Pythonis reaalajas andmete voogesituse juurutamine toimib tänapäeva andmehulga maailmas olulise oskusena. Selles juhendis uuritakse põhilisi samme ja olulisi tööriistu reaalajas andmete voogesituse autentseks kasutamiseks Pythonis. Alates sobiva raamistiku (nt Apache Kafka või Apache Pulsar) valimisest kuni Pythoni koodi kirjutamiseni, mis võimaldab vaevata andmetarbimist, -töötlust ja tõhusat visualiseerimist, omandame vajalikud oskused paindlike ja tõhusate reaalajas andmekanalite loomiseks.

Näide 1: reaalajas andmete voogesituse rakendamine Pythonis

Reaalajas andmete voogesituse rakendamine Pythonis on tänapäeva andmepõhisel ajastul ja maailmas ülioluline. Selles üksikasjalikus näites käsitleme reaalajas andmete voogesituse süsteemi loomist, kasutades Google Colabis Apache Kafkat ja Pythonit.







Näite lähtestamiseks enne kodeerimise alustamist on oluline luua Google Colabis konkreetne keskkond. Esimene asi, mida peame tegema, on installida vajalikud teegid. Kafka integreerimiseks kasutame teeki 'kafka-python'.



! pip installida kafka-python


See käsk installib teegi 'kafka-python', mis pakub Pythoni funktsioone ja Apache Kafka sidumisi. Järgmisena impordime oma projekti jaoks vajalikud teegid. Nõutavate teekide importimine, sealhulgas „KafkaProducer” ja „KafkaConsumer”, on „kafka-python” teegi klassid, mis võimaldavad meil suhelda Kafka maakleritega. JSON on Pythoni teek, mis töötab JSON-andmetega, mida kasutame sõnumite serialiseerimiseks ja deserialiseerimiseks.



kafka impordist KafkaProducer, KafkaConsumer
importida json


Kafka produtsendi loomine





See on oluline, sest Kafka tootja saadab andmed Kafka teemasse. Meie näites loome tootja, kes saadab simuleeritud reaalajas andmed teemale nimega 'reaalajas teema'.

Loome 'KafkaProduceri' eksemplari, mis määrab Kafka maakleri aadressiks 'localhost:9092'. Seejärel kasutame funktsiooni 'value_serializer', mis järjestab andmed enne nende saatmist Kafkale. Meie puhul kodeerib lambda-funktsioon andmed UTF-8-kodeeringuga JSON-ina. Nüüd simuleerime mõningaid reaalajas andmeid ja saadame need Kafka teemasse.



tootja = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
väärtus_serialiseerija =lambda v: json.dumps ( sisse ) .kodeerida ( 'utf-8' ) )
# Simuleeritud reaalajas andmed
andmed = { 'sensor_id' : 1 , 'temperatuur' : 25.5 , 'niiskus' : 60.2 }
# Andmete saatmine teemasse
tootja.saata ( 'reaalajas teema' , andmed )


Nendel ridadel määratleme 'andmete' sõnastiku, mis esindab simuleeritud anduri andmeid. Seejärel kasutame nende andmete avaldamiseks reaalajas teemas 'saatmise' meetodit.

Seejärel tahame luua Kafka tarbija ja Kafka tarbija loeb Kafka teema andmeid. Loome tarbija, kes tarbib ja töötleb sõnumeid 'reaalajas teemas'. Loome 'KafkaConsumer' eksemplari, täpsustades teema, mida soovime tarbida, nt (reaalajas teema) ja Kafka maakleri aadressi. Seejärel on 'value_deserializer' funktsioon, mis deserialiseerib Kafkalt saadud andmed. Meie puhul dekodeerib lambda-funktsioon andmed UTF-8-kodeeringuga JSON-ina.

tarbija = KafkaConsumer ( 'reaalajas teema' ,
bootstrap_servers = 'localhost:9092' ,
väärtus_deserialiseerija =lambda x: json.loads ( x.dekodeerida ( 'utf-8' ) ) )


Teema sõnumite pidevaks tarbimiseks ja töötlemiseks kasutame iteratiivset tsüklit.

# Reaalajas andmete lugemine ja töötlemine
jaoks sõnum sisse tarbija:
andmed = sõnum.väärtus
printida ( f 'Saadud andmed: {data}' )


Toome silmuses iga sõnumi väärtuse ja oma simuleeritud anduri andmed ning prindime need konsooli. Kafka tootja ja tarbija käitamine hõlmab selle koodi käitamist Google Colabis ja koodilahtrite eraldi käivitamist. Tootja saadab simuleeritud andmed Kafka teemasse ning tarbija loeb ja prindib saadud andmed.


Väljundi analüüs koodi käitamisel

Jälgime reaalajas andmeid, mida toodetakse ja tarbitakse. Andmevorming võib varieeruda sõltuvalt meie simulatsioonist või tegelikust andmeallikast. Selles üksikasjalikus näites käsitleme kogu reaalajas andmete voogesituse süsteemi seadistamise protsessi, kasutades Google Colabis Apache Kafkat ja Pythonit. Selgitame iga koodirida ja selle tähtsust selle süsteemi loomisel. Reaalajas andmete voogesitus on võimas võimalus ja see näide on aluseks keerukamatele reaalmaailma rakendustele.

Näide 2: reaalajas andmete voogesituse rakendamine Pythonis, kasutades börsiandmeid

Teeme veel ühe ainulaadse näite reaalajas andmete voogesituse rakendamisest Pythonis, kasutades teistsugust stsenaariumi; seekord keskendume börsiandmetele. Loome reaalajas andmete voogedastussüsteemi, mis fikseerib aktsiahindade muutused ja töötleb neid Google Colabis Apache Kafka ja Pythoniga. Nagu eelmises näites näidatud, alustame oma keskkonna seadistamisega Google Colabis. Esiteks installime vajalikud teegid:

! pip installida kafka-python yfinance


Siia lisame 'yfinance' raamatukogu, mis võimaldab meil saada reaalajas aktsiaturu andmeid. Järgmiseks impordime vajalikud teegid. Jätkame Kafka suhtluseks kafka-python'i teegi klasside „KafkaProducer” ja „KafkaConsumer” kasutamist. Impordime JSON-i, et töötada JSON-andmetega. Reaalajas aktsiaturu andmete saamiseks kasutame ka “yfinance”. Impordime ka ajateegi, et lisada reaalajas värskenduste simuleerimiseks viivitus.

kafka impordist KafkaProducer, KafkaConsumer
importida json
import yfinance nagu yf
importida aega


Nüüd loome laoandmete jaoks Kafka tootja. Meie Kafka tootja saab reaalajas laoandmed ja saadab need Kafka teemale nimega “aktsia hind”.

tootja = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
väärtus_serialiseerija =lambda v: json.dumps ( sisse ) .kodeerida ( 'utf-8' ) )

samas Tõsi:
stock = yf.Ticker ( 'AAPL' ) # Näide: Apple Inc. aktsia
stock_data = stock.ajalugu ( periood = '1d' )
viimane_hind = laoandmed [ 'Sulge' ] .iloc [ - 1 ]
andmed = { 'sümbol' : 'AAPL' , 'hind' : viimane hind }
tootja.saata ( 'börsihind' , andmed )
aeg.uni ( 10 ) # Simuleerige reaalajas värskendusi iga 10 sekundi järel


Loome 'KafkaProduceri' eksemplari, mille Kafka maakleri aadress on selles koodis. Silmuse sees kasutame Apple Inc.-i (AAPL) uusima aktsiahinna hankimiseks sõna „yfinance”. Seejärel eraldame viimase sulgemishinna ja saadame selle teemasse 'aktsia hind'. Lõpuks võtame kasutusele viivituse, et simuleerida reaalajas värskendusi iga 10 sekundi järel.

Loome Kafka tarbija, kes loeb ja töötleb aktsiahinna andmeid teemast 'aktsia hind'.

tarbija = KafkaConsumer ( 'börsihind' ,
bootstrap_servers = 'localhost:9092' ,
väärtus_deserialiseerija =lambda x: json.loads ( x.dekodeerida ( 'utf-8' ) ) )

jaoks sõnum sisse tarbija:
stock_data = sõnum.väärtus
printida ( f 'Saadud laoandmed: {stock_data['symbol']} – hind: {stock_data['price']}' )


See kood sarnaneb eelmise näite tarbija seadistusega. See loeb ja töötleb pidevalt “aktsiahinna” teema sõnumeid ning prindib konsooli aktsia sümboli ja hinna. Käivitame koodilahtrid järjestikku, nt ükshaaval Google Colabis, et käitada tootjat ja tarbijat. Tootja saab ja saadab reaalajas aktsiahindade värskendusi, samal ajal kui tarbija neid andmeid loeb ja kuvab.

! pip installida kafka-python yfinance
kafka impordist KafkaProducer, KafkaConsumer
importida json
import yfinance nagu yf
importida aega
tootja = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
väärtus_serialiseerija =lambda v: json.dumps ( sisse ) .kodeerida ( 'utf-8' ) )

samas Tõsi:
stock = yf.Ticker ( 'AAPL' ) # Apple Inc. aktsia
stock_data = stock.ajalugu ( periood = '1d' )
viimane_hind = laoandmed [ 'Sulge' ] .iloc [ - 1 ]

andmed = { 'sümbol' : 'AAPL' , 'hind' : viimane hind }

tootja.saata ( 'börsihind' , andmed )

aeg.uni ( 10 ) # Simuleerige reaalajas värskendusi iga 10 sekundi järel
tarbija = KafkaConsumer ( 'börsihind' ,
bootstrap_servers = 'localhost:9092' ,
väärtus_deserialiseerija =lambda x: json.loads ( x.dekodeerida ( 'utf-8' ) ) )

jaoks sõnum sisse tarbija:
stock_data = sõnum.väärtus
printida ( f 'Saadud laoandmed: {stock_data['symbol']} – hind: {stock_data['price']}' )


Väljundi analüüsimisel pärast koodi käitamist jälgime Apple Inc.-i aktsiahindade reaalajas värskendusi, mida toodetakse ja tarbitakse.

Järeldus

Selles ainulaadses näites demonstreerisime reaalajas andmete voogesituse rakendamist Pythonis, kasutades aktsiaturu andmete kogumiseks ja töötlemiseks Apache Kafkat ja teeki 'yfinance'. Selgitasime põhjalikult iga koodi rida. Reaalajas andmete voogesitust saab rakendada erinevates valdkondades, et luua reaalmaailma rakendusi rahanduses, asjade internetis ja mujal.