Producer和Consumer

#!/usr/bin/env python

import threading, logging, time

import multiprocessing

from kafka import KafkaConsumer, KafkaProducer

BOOTSTRAP_SERVERS='127.0.0.1:9092'

class Producer(threading.Thread):

def __init__(self):

threading.Thread.__init__(self)

self.stop_event = threading.Event()

def stop(self):

self.stop_event.set()

def run(self):

producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)

while not self.stop_event.is_set():

producer.send('my-topic', b"test")

producer.send('my-topic', b"\xc2Hola, mundo!")

time.sleep(1)

producer.close()

#读取数据

class Consumer(multiprocessing.Process):

def __init__(self):

multiprocessing.Process.__init__(self)

self.stop_event = multiprocessing.Event()

def stop(self):

self.stop_event.set()

def run(self):

consumer = KafkaConsumer(bootstrap_servers=BOOTSTRAP_SERVERS,

auto_offset_reset='earliest',

consumer_timeout_ms=1000)

#订阅某个topic

consumer.subscribe(['my-topic'])

while not self.stop_event.is_set():

for message in consumer:

print(message)

if self.stop_event.is_set():

break

consumer.close()

def main():

tasks = [

#Producer(),

Consumer()

]

for t in tasks:

t.start()

time.sleep(3600)

for task in tasks:

task.stop()

for task in tasks:

task.join()

if __name__ == "__main__":

logging.basicConfig(

format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',

level=logging.INFO

)

main()

KafkaConsumer

#!/usr/bin/env python

#coding:gbk

#kafka的使用 consumer使用

import kafka import KafkaConsumer

#消费kafka中最新的数据 并且自动提交offsets[消息的偏移量]

consumer = KafkaConsumer('my-topic',

group_id='my-group',

bootstrap_servers=['localhost:9092'])

from message in consumer:

#注意: message ,value都是原始的字节数据,需要decode

#例如: message.value.decode('utf-8')

print ("%s:%d:%d: key=%s value=%s" %s (message.topic, message.partition,

message.offset, message.key,

message.value))

#下面代码展示了kafkaConsumer常用的几个参数

#1:消费kafka中保存最早的数据,kafka默认保存几天的历史数据,不管这些数据是否消费,如果想读取最早打

数据就需要设置如下参数,第二个参数是不自动提交消费数据的offset

KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)

#2:消费json 格式的消息:

KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))

#3:设置当kafka中没有可消费的数据超时时间

KafkaConsumer(consumer_timeout_ms=1000)#如果1秒内kafka中没有可供消费的数据,自动退出

#如果kafka一个group中同时设置了n个topic,想同时从几个topic中消费数据,代码如下:

#假设有三个topic,topic的名称分别是:topic1=awesome1 topic2=awesome2 topic3=awesome3

consumer = KafkaConsumer()

consumer.subscribe(pattern='^awesome.*')

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐