python: 嗅探网卡HTTP流量并存储到Kafka 转自:路哥
1、读取网卡http数据,插入kafka
from pykafka import KafkaClient
from scapy.all import *
import time
import logging
logging.getLogger("pykafka").addHandler(logging.StreamHandler())
logging.getLogger("pykafka").setLevel(logging.DEBUG)
client = KafkaClient(zookeeper_hosts ="172.20.222.96:2181,172.20.222.97:2181")
topic = client.topics['request_message']
producers = topic.get_sync_producer()
def callback(dpkt):
for p in dpkt:
for f in p.payload.payload.payload.fields_desc:
fvalue = p.payload.payload.getfieldval(f.name)
reprval = f.i2repr(p.payload.payload, fvalue)
if 'HTTP' in reprval and 'GET' in reprval:
reprval = reprval[1:]
request_text = reprval[:-1]
producers.produce(str(request_text))
if __name__=="__main__":
sniff(iface = "enp2s0f1", filter="tcp port 80 or tcp port 443", prn=callback)
2、顺便附上kafka消费代码
from pykafka import KafkaClient
from scapy.all import *
import time
import logging
logging.getLogger("pykafka").addHandler(logging.StreamHandler())
logging.getLogger("pykafka").setLevel(logging.DEBUG)
client = KafkaClient(zookeeper_hosts ="172.20.222.96:2181,172.20.222.97:2181")
topic = client.topics['request_message']
if __name__=="__main__":
consumer = topic.get_balanced_consumer(consumer_group='net_url',auto_commit_enable=True)
for message in consumer:
print message.value
3、 关于 simple_consumer和balanced_consumer的区别:
simple_consumer:简单读取模式,不记录offset值,每次执行都是从消息队列的起始位置读取
balanced_consumer:参数group 表示读取者所属组,每次读取后会记录当前读取位置,下次读取时从该位置读取,这样保证了,多个同组的读取者,每次读取的数据不重复