python2.7版本使用以下代码向Kafka发送数据时正常,但是在python3.7版本使用Kafka报错:return '' % self.async;原因是async是python3.7版本的关键字引起的,通过命令执行pip install kafka-python就可以解决这个问题。 #该代码在2.7版本运行正常,但是3.7版本运行报错:return '' % self.async

# -- coding: UTF-8

import datetime

import json

import time

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['IP:9092'])

future = producer.send(b'account', json.dumps(

{"method": "get", "step": "1", "type": "test", "testName": "kafka",

"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),

"info": "demo{}".format(1)}))

record_metadata = future.get(timeout=10)

print record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S')

运行之后解决了上述问题,但还是会报错,return kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs. 第一反应是Kafka版本不对引起的,就在实例化KafkaProducer类添加了版本将python2.7的代码改为(将上面的代码改为如下,添加版本): # -- coding: UTF-8

import datetime

import json

import time

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))

future = producer.send(b'account', json.dumps(

{"method": "get", "step": "1", "type": "test", "testName": "kafka",

"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),

"info": "demo{}".format(1)}))

record_metadata = future.get(timeout=10)

print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

还是return kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

发现不行,不是版本的问题,让我怀疑是否连接成功,将上诉代码修改为:

# -- coding: UTF-8

import datetime

import json

import time

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))

print(producer.config)##打印配置信息

future = producer.send(b'account', json.dumps(

{"method": "get", "step": "1", "type": "test", "testName": "kafka",

"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),

"info": "demo{}".format(1)}))

record_metadata = future.get(timeout=10)

print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

打印信息如上,配置生效了。debug模式也显示Connected。证明连接上了。目前是send函数有问题,3.x版本的send函数有问题。通过排查发现是send函数体里面的 self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)这个调用类函数有问题,接收的参数是str类型,但是我们传的类型是byte类型,将上述代码修改后又会报错

修改send函数的参数topic名称以str类型传入 # -- coding: UTF-8

import datetime

import json

import time

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='IP:9092',api_version=(1,0,1))

print(producer.config)##打印配置信息

future = producer.send('account', json.dumps(

{"method": "get", "step": "1", "type": "test", "testName": "kafka",

"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),

"info": "demo{}".format(1)}))

record_metadata = future.get(timeout=10)

print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

运行后继续报错

Debug打开,发现是在进入send函数体中, assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))报错,value_bytes是str类型 assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))

assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))

正确的send数据到Kafka代码如下:

# -- coding: UTF-8

import datetime

import json

import time

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='IP:9092')

future = producer.send('account', json.dumps(

{"method": "get", "step": "1", "type": "test", "testName": "kafka",

"cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),

"info": "demo{}".format(1)}).encode())

record_metadata = future.get(timeout=10)

print( record_metadata, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

Kafka使用相关链接

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐