Cum să implementați fluxul de date în timp real în Python

Cum Sa Implementati Fluxul De Date In Timp Real In Python



Stăpânirea implementării fluxului de date în timp real în Python acționează ca o abilitate esențială în lumea de astăzi implicată în date. Acest ghid explorează pașii de bază și instrumentele esențiale pentru utilizarea fluxului de date în timp real cu autenticitate în Python. De la selectarea unui cadru potrivit, cum ar fi Apache Kafka sau Apache Pulsar, până la scrierea unui cod Python pentru un consum de date fără efort, procesare și vizualizare eficientă, vom dobândi abilitățile necesare pentru a construi canalele de date agile și eficiente în timp real.

Exemplul 1: Implementarea fluxului de date în timp real în Python

Implementarea unui flux de date în timp real în Python este crucială în epoca și lumea actuală bazată pe date. În acest exemplu detaliat, vom parcurge procesul de construire a unui sistem de streaming de date în timp real folosind Apache Kafka și Python în Google Colab.







Pentru a inițializa exemplul înainte de a începe codificarea, construirea unui mediu specific în Google Colab este esențială. Primul lucru pe care trebuie să-l facem este să instalăm bibliotecile necesare. Folosim biblioteca „kafka-python” pentru integrarea Kafka.



! pip instalare kafka-python


Această comandă instalează biblioteca „kafka-python” care oferă funcțiile Python și legăturile pentru Apache Kafka. Apoi, importăm bibliotecile necesare pentru proiectul nostru. Importarea bibliotecilor necesare, inclusiv „KafkaProducer” și „KafkaConsumer” sunt clasele din biblioteca „kafka-python” care ne permit să interacționăm cu brokerii Kafka. JSON este biblioteca Python pentru a lucra cu datele JSON pe care le folosim pentru a serializa și deserializa mesajele.



de la kafka import KafkaProducer, KafkaConsumer
import json


Crearea unui producător Kafka





Acest lucru este important deoarece un producător Kafka trimite datele către un subiect Kafka. În exemplul nostru, creăm un producător care să trimită date simulate în timp real la un subiect numit „subiect în timp real”.

Creăm o instanță „KafkaProducer” care specifică adresa brokerului Kafka ca „localhost:9092”. Apoi, folosim „value_serializer”, o funcție care serializează datele înainte de a le trimite către Kafka. În cazul nostru, o funcție lambda codifică datele ca JSON codificat în UTF-8. Acum, să simulăm câteva date în timp real și să le trimitem la subiectul Kafka.



producator = KafkaProducer ( bootstrap_servers = „localhost:9092” ,
value_serialzer =lambda v: json.dumps ( în ) .codifica ( „utf-8” ) )
# Date simulate în timp real
date = { „sensor_id” : 1 , 'temperatura' : 25.5 , 'umiditate' : 60.2 }
# Trimiterea datelor la subiect
producator.trimite ( „temă în timp real” , date )


În aceste rânduri, definim un dicționar de „date” care reprezintă datele unui senzor simulat. Apoi folosim metoda „trimite” pentru a publica aceste date în „subiectul în timp real”.

Apoi, dorim să creăm un consumator Kafka, iar un consumator Kafka citește datele dintr-un subiect Kafka. Creăm un consumator care să consume și să proceseze mesajele din „subiectul în timp real”. Creăm o instanță „KafkaConsumer”, specificând subiectul pe care vrem să îl consumăm, de exemplu (subiect în timp real) și adresa brokerului Kafka. Apoi, „value_deserializer” este o funcție care deserializează datele care sunt primite de la Kafka. În cazul nostru, o funcție lambda decodifică datele ca JSON codificat în UTF-8.

consumer = KafkaConsumer ( „temă în timp real” ,
bootstrap_servers = „localhost:9092” ,
value_deserializer =lambda x: json.loads ( x.decodare ( „utf-8” ) ) )


Folosim o buclă iterativă pentru a consuma și procesa continuu mesajele din subiect.

# Citirea și procesarea datelor în timp real
pentru mesaj în consumator:
date = mesaj.valoare
imprimare ( f „Date primite: {date}” )


Preluăm valoarea fiecărui mesaj și datele senzorului simulat din interiorul buclei și le imprimăm pe consolă. Rularea producătorului și consumatorului Kafka implică rularea acestui cod în Google Colab și executarea individuală a celulelor de cod. Producătorul trimite datele simulate către subiectul Kafka, iar consumatorul citește și imprimă datele primite.


Analiza ieșirii pe măsură ce se rulează codul

Vom observa datele în timp real care sunt produse și consumate. Formatul datelor poate varia în funcție de simularea noastră sau de sursa reală de date. În acest exemplu detaliat, acoperim întregul proces de configurare a unui sistem de streaming de date în timp real folosind Apache Kafka și Python în Google Colab. Vom explica fiecare linie de cod și semnificația ei în construirea acestui sistem. Streamingul de date în timp real este o capacitate puternică, iar acest exemplu servește drept bază pentru aplicații mai complexe din lumea reală.

Exemplul 2: Implementarea unui flux de date în timp real în Python utilizând datele pieței de valori

Să facem un alt exemplu unic de implementare a unui flux de date în timp real în Python folosind un scenariu diferit; de data aceasta, ne vom concentra pe datele bursiere. Creăm un sistem de streaming de date în timp real care surprinde modificările prețului acțiunilor și le procesează folosind Apache Kafka și Python în Google Colab. După cum sa demonstrat în exemplul anterior, începem prin a configura mediul nostru în Google Colab. Mai întâi, instalăm bibliotecile necesare:

! pip instalare kafka-python yfinance


Aici, adăugăm biblioteca „yfinance”, care ne permite să obținem date de bursă în timp real. Apoi, importăm bibliotecile necesare. Continuăm să folosim clasele „KafkaProducer” și „KafkaConsumer” din biblioteca „kafka-python” pentru interacțiunea cu Kafka. Importăm JSON pentru a lucra cu datele JSON. De asemenea, folosim „yfinance” pentru a obține date în timp real pe bursă. Importăm și biblioteca „timp” pentru a adăuga o întârziere pentru a simula actualizările în timp real.

de la kafka import KafkaProducer, KafkaConsumer
import json
import yfinance la fel de yf
import timp


Acum, creăm un producător Kafka pentru datele stocurilor. Producătorul nostru Kafka primește date de stoc în timp real și le trimite la un subiect Kafka numit „prețul stocului”.

producator = KafkaProducer ( bootstrap_servers = „localhost:9092” ,
value_serialzer =lambda v: json.dumps ( în ) .codifica ( „utf-8” ) )

in timp ce Adevărat:
stoc = yf.Ticker ( 'AAPL' ) # Exemplu: stoc Apple Inc
stoc_date = stoc.istorie ( perioadă = '1d' )
last_price = stoc_date [ 'Închide' ] .iloc [ - 1 ]
date = { 'simbol' : 'AAPL' , 'Preț' : ultimul pret }
producator.trimite ( 'prețul acțiunii' , date )
timp.somn ( 10 ) # Simulați actualizări în timp real la fiecare 10 secunde


Creăm o instanță „KafkaProducer” cu adresa brokerului Kafka în acest cod. În cadrul buclei, folosim „yfinance” pentru a obține cel mai recent preț al acțiunilor pentru Apple Inc. („AAPL”). Apoi, extragem ultimul preț de închidere și îl trimitem la subiectul „preț-acțiune”. În cele din urmă, introducem o întârziere pentru a simula actualizările în timp real la fiecare 10 secunde.

Să creăm un consumator Kafka care să citească și să proceseze datele despre prețul acțiunilor din subiectul „prețul acțiunilor”.

consumer = KafkaConsumer ( 'prețul acțiunii' ,
bootstrap_servers = „localhost:9092” ,
value_deserializer =lambda x: json.loads ( x.decodare ( „utf-8” ) ) )

pentru mesaj în consumator:
stoc_data = mesaj.valoare
imprimare ( f „Date de stoc primite: {stock_data['symbol']} - Preț: {stock_data['price']}' )


Acest cod este similar cu configurația pentru consumator din exemplul anterior. Citește și procesează în mod continuu mesajele din subiectul „preț-acțiune” și tipărește simbolul bursier și prețul pe consolă. Executăm secvențial celulele de cod, de exemplu, una câte una în Google Colab pentru a rula producătorul și consumatorul. Producătorul primește și trimite actualizările în timp real a prețului acțiunilor în timp ce consumatorul citește și afișează aceste date.

! pip instalare kafka-python yfinance
de la kafka import KafkaProducer, KafkaConsumer
import json
import yfinance la fel de yf
import timp
producator = KafkaProducer ( bootstrap_servers = „localhost:9092” ,
value_serialzer =lambda v: json.dumps ( în ) .codifica ( „utf-8” ) )

in timp ce Adevărat:
stoc = yf.Ticker ( 'AAPL' ) # stoc Apple Inc
stoc_date = stoc.istorie ( perioadă = '1d' )
last_price = stoc_date [ 'Închide' ] .iloc [ - 1 ]

date = { 'simbol' : 'AAPL' , 'Preț' : ultimul pret }

producator.trimite ( 'prețul acțiunii' , date )

timp.somn ( 10 ) # Simulați actualizări în timp real la fiecare 10 secunde
consumer = KafkaConsumer ( 'prețul acțiunii' ,
bootstrap_servers = „localhost:9092” ,
value_deserializer =lambda x: json.loads ( x.decodare ( „utf-8” ) ) )

pentru mesaj în consumator:
stoc_data = mesaj.valoare
imprimare ( f „Date de stoc primite: {stock_data['symbol']} - Preț: {stock_data['price']}' )


În analiza rezultatelor după rularea codului, vom observa actualizările în timp real ale prețului acțiunilor pentru Apple Inc. care sunt produse și consumate.

Concluzie

În acest exemplu unic, am demonstrat implementarea fluxului de date în timp real în Python folosind Apache Kafka și biblioteca „yfinance” pentru a captura și procesa datele bursiere. Am explicat în detaliu fiecare linie a codului. Streamingul de date în timp real poate fi aplicat în diferite domenii pentru a construi aplicații din lumea reală în finanțe, IoT și multe altele.