kafka-python
消费
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import datetime
import json
import time
import sys
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
consumer = KafkaConsumer('test', auto_offset_reset='earliest', bootstrap_servers=['10.222.32.122:9092'])
# consumer = KafkaConsumer('test', bootstrap_servers=['10.222.32.122:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
生产
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import datetime
import json
import time
import sys
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['10.222.32.122:9092'])
future = producer.send('test', b'yongsheng8')
生产 json message
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import datetime
import json
import time
import sys
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=['10.222.32.122:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for i in range(10):
data = {
"name": "里斯本",
"age": 23,
"gender": "man",
"id": i
}
producer.send('json-test3', data)
producer.close()
消费 json message
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import datetime
import json
import time
import sys
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
consumer = KafkaConsumer(
'json-test3',
# group_id='json-group-test',
bootstrap_servers=['10.222.32.122:9092'],
auto_offset_reset='earliest',
value_deserializer=json.loads
)
for message in consumer:
print(message.value)
print(json.dumps(message.value, indent=4))
参考资料
https://kafka-python.readthedocs.io/en/master/usage.html
https://github.com/dpkp/kafka-python
http://localhost:9000/index.html#document-usage