python操作kafka

python: 嗅探网卡HTTP流量并存储到Kafka 转自:路哥

1、读取网卡http数据,插入kafka

from pykafka import KafkaClient
from scapy.all import *
import time
import logging

logging.getLogger("pykafka").addHandler(logging.StreamHandler())
logging.getLogger("pykafka").setLevel(logging.DEBUG)

client = KafkaClient(zookeeper_hosts ="172.20.222.96:2181,172.20.222.97:2181")
topic = client.topics['request_message']
producers = topic.get_sync_producer()

def callback(dpkt):
    for p in dpkt:
    for f in p.payload.payload.payload.fields_desc:
        fvalue = p.payload.payload.getfieldval(f.name)
        reprval = f.i2repr(p.payload.payload, fvalue)
        if 'HTTP' in reprval and 'GET' in reprval:
        reprval = reprval[1:]
        request_text = reprval[:-1]
                producers.produce(str(request_text))

if __name__=="__main__":  
    sniff(iface = "enp2s0f1", filter="tcp port 80 or tcp port 443", prn=callback)

2、顺便附上kafka消费代码

from pykafka import KafkaClient
from scapy.all import *
import time
import logging

logging.getLogger("pykafka").addHandler(logging.StreamHandler())
logging.getLogger("pykafka").setLevel(logging.DEBUG)

client = KafkaClient(zookeeper_hosts ="172.20.222.96:2181,172.20.222.97:2181")
topic = client.topics['request_message']
if __name__=="__main__":  
    consumer = topic.get_balanced_consumer(consumer_group='net_url',auto_commit_enable=True) 
    for message in consumer:
        print message.value

3、 关于 simple_consumer和balanced_consumer的区别:

simple_consumer:简单读取模式,不记录offset值,每次执行都是从消息队列的起始位置读取

balanced_consumer:参数group 表示读取者所属组,每次读取后会记录当前读取位置,下次读取时从该位置读取,这样保证了,多个同组的读取者,每次读取的数据不重复