kafka-python

消费

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import datetime
import json
import time
import sys
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError


consumer = KafkaConsumer('test', auto_offset_reset='earliest', bootstrap_servers=['10.222.32.122:9092'])
# consumer = KafkaConsumer('test', bootstrap_servers=['10.222.32.122:9092'])

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))

生产

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import datetime
import json
import time
import sys
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['10.222.32.122:9092'])

future = producer.send('test', b'yongsheng8')

生产 json message

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import datetime
import json
import time
import sys
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers=['10.222.32.122:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for i in range(10):
    data = {
        "name": "里斯本",
        "age": 23,
        "gender": "man",
        "id": i
    }

    producer.send('json-test3', data)

producer.close()

消费 json message

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import datetime
import json
import time
import sys
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError


consumer = KafkaConsumer(
    'json-test3',
    # group_id='json-group-test',
    bootstrap_servers=['10.222.32.122:9092'],
    auto_offset_reset='earliest',
    value_deserializer=json.loads
)

for message in consumer:
    print(message.value)
    print(json.dumps(message.value, indent=4))

参考资料

https://kafka-python.readthedocs.io/en/master/usage.html
https://github.com/dpkp/kafka-python
http://localhost:9000/index.html#document-usage