Da biste dovršili ovu lekciju, na svom stroju morate imati aktivnu instalaciju za Kafku. Pročitajte Instaliranje Apache Kafke na Ubuntuu da biste znali kako to učiniti.
Instaliranje Python klijenta za Apache Kafka
Prije nego što počnemo raditi s Apache Kafkom u programu Python, moramo instalirati Python klijent za Apache Kafka. To se može učiniti pomoću pip (Indeks Python paketa). Evo naredbe da se to postigne:
pip3 instaliraj kafka-pythonOvo će biti brza instalacija na terminalu:
Instalacija Python Kafka klijenta pomoću PIP-a
Sad kad imamo aktivnu instalaciju za Apache Kafka, a instalirali smo i Python Kafka klijent, spremni smo za početak kodiranja.
Izrada producenta
Prvo što morate objaviti poruke na Kafki je proizvođačka aplikacija koja može slati poruke temama u Kafki.
Imajte na umu da su proizvođači Kafke asinkroni proizvođači poruka. To znači da radnje izvršene tijekom objavljivanja poruke na particiji Kafka Topic ne blokiraju. Kako bi stvari bile jednostavne, za ovu ćemo lekciju napisati jednostavni JSON izdavač.
Za početak napravite instancu za Kafka Producer:
iz tvrtke kafka import KafkaProduceruvoz json
uvozni otisak
proizvođač = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.odlagališta (v).encode ('utf-8'))
Atribut bootstrap_servers informira o hostu i portu za Kafka poslužitelj. Atribut value_serializer samo je u svrhu JSON serializacije JSON vrijednosti na koje se nailazi.
Da bismo se poigrali s producentom Kafke, pokušajmo ispisati mjerne podatke povezane s klasterom Producer i Kafka:
metrika = producent.metrika()otisak.otisak (metrika)
Sad ćemo vidjeti sljedeće:
Kafka Mterics
Pokušajmo konačno poslati neku poruku u red Kafka. Jednostavan JSON objekt bit će dobar primjer:
proizvođač.pošalji ('linuxhint', 'topic': 'kafka')The linuxhint je particija teme na kojoj će se poslati JSON objekt. Kada pokrenete skriptu, nećete dobiti izlaz jer se poruka samo šalje na particiju teme. Vrijeme je da napišemo potrošača kako bismo mogli testirati našu aplikaciju.
Stvaranje potrošača
Sada smo spremni uspostaviti novu vezu kao Potrošačka aplikacija i primati poruke iz Kafkine teme. Započnite s izradom nove instance za potrošača:
iz tvrtke kafka import KafkaConsumeriz kafka import TopicPartition
print ('Uspostavljanje veze.')
potrošač = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
Sada ovoj vezi dodijelite temu, a također i moguću vrijednost pomaka.
print ('Dodjeljivanje teme.')potrošač.dodijeliti ([TopicPartition ('linuxhint', 2)])
Napokon, spremni smo za ispis poruke:
print ('Dobivanje poruke.')za poruku u potrošaču:
print ("OFFSET:" + str (poruka [0]) + "\ t MSG:" + str (poruka))
Kroz ovo ćemo dobiti popis svih objavljenih poruka na Kafka Consumer Topic Partition. Rezultat ovog programa bit će:
Kafka Potrošač
Samo za brzu referencu, ovdje je kompletna skripta proizvođača:
iz tvrtke kafka import KafkaProduceruvoz json
uvozni otisak
proizvođač = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.odlagališta (v).encode ('utf-8'))
proizvođač.pošalji ('linuxhint', 'topic': 'kafka')
# metrika = proizvođač.metrika()
# otisak.otisak (metrika)
I evo kompletnog potrošačkog programa koji smo koristili:
iz tvrtke kafka import KafkaConsumeriz kafka import TopicPartition
print ('Uspostavljanje veze.')
potrošač = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
print ('Dodjeljivanje teme.')
potrošač.dodijeliti ([TopicPartition ('linuxhint', 2)])
print ('Dobivanje poruke.')
za poruku u potrošaču:
print ("OFFSET:" + str (poruka [0]) + "\ t MSG:" + str (poruka))
Zaključak
U ovoj smo lekciji pogledali kako možemo instalirati i početi koristiti Apache Kafka u našim programima Python. Pokazali smo kako je lako izvoditi jednostavne zadatke povezane s Kafkom u Pythonu s demonstriranim Kafka klijentom za Python.