"""
mitmproxy 解密插件(其实没有写具体的解密算法),具体解密算法需要单独实现,
alldecrypt 函数中根据 host 和 what 来判断需要使用的具体的解密方法
-q 选项参数会开启自动解密,当满足过滤条件时自动解密,结果显示在 eventlog 中,指令 e 显示 eventlog
根据 host 设置过滤条件,例子 ~d 127.0.0.1
usage: mitmproxy -s mydecrypt.py -q 开启自动解密
"""
import binascii
import json
import mitmproxy
from mitmproxy import contentviews
from mitmproxy import ctx
from mitmproxy import flowfilter
from mitmproxy.contentviews import base
from mitmproxy.http import HTTPFlow
from mitmproxy.tools.console import flowview
# ============举个例子=============
def decrespone(data):
# 解密 response
return "decres<< " + "需要实现解密算法" + data
def decrequest(data):
# 解密 request
return "decreq>> " + "需要实现解密算法" + data
# ==============结束===============
def alldecrypt(host="127.0.0.1", data=None, what=0):
"""
统一解密入口
:param host: 接口的 host 地址
:param data: 需要解密的数据
:param what: 0 客户端数据 1 服务端数据
:return: 返回解密后的数据
"""
# TODO 根据 host 判断需要使用的解密算法,根据 what 判断解密的是客户端数据或者服务端数据
# 下面一组数据是为测试用的
data = "这是假的数据为了测试"
# 存放需要匹配的 host 与对应的解密方法名称
sks = {("127.0.0.1", 1, "decrespone(data)"),
("127.0.0.1", 0, "decrequest(data)")}
for s in sks:
# host 与 what 同时匹配时得到 解密方法,可能解密客户端和服务端使用不同的算法
if s[0] == host and s[1] == what:
func = s[2]
if func is not None:
return eval(func)
else:
return "没有匹配的 host 数据"
class Mydecrypt(base.View):
"""
插件 ui,用于显示解密的请求数据或者服务器返回数据
"""
name = "MyDecrypt"
prompt = ("decrypt", "d")
def __call__(self, data, **kwargs):
# 获取当前 http 的信息传给 统一解密函数
what = curflow.getwhat()
host = curflow.gethost()
descypt = alldecrypt(host, data, what)
title = "解密"
if what == 0:
title += "请求数据"
else:
title += "返回数据"
return title, base.format_text(descypt)
class Currentflow:
"""
用于保存当前的 http 数据,每次通过 ui 切换时都会更新最新的 http 数据
what = 0 表示当前是 request
what = 1 表示当前是 response
host 当前请求的 host 地址
"""
def __init__(self):
self.what = 0
self.host = ""
def updatewhat(self, what):
self.what = what
def updatehost(self, host):
self.host = host
def gethost(self):
return self.host
def getwhat(self):
return self.what
curflow = Currentflow()
mydecrypt = Mydecrypt()
def mylog_tier(level):
# 修改 warn 类型的值小于 error,只有 warn 类型的日志会显示
return dict(error=0, warn=-1, info=2, debug=3).get(level)
def myview_request(self):
# 更新当前显示的 host 以及 data 类型为 request
curflow.updatewhat(0)
curflow.updatehost(self.flow.request.host)
return self.conn_text(self.flow.request)
def myview_response(self):
# 更新当前显示的 host 以及 data 类型为 response
curflow.updatewhat(1)
curflow.updatehost(self.flow.request.host)
return self.conn_text(self.flow.response)
class Myaddon:
"""
插件主体,实现对插件 ui 的管理,处理请求的实时解密
"""
def __init__(self):
self.filter = None
def configure(self, options, updated):
# 保存最新的 过滤条件
self.filter = options.filter
def response(self, f: HTTPFlow):
"""
全部 response 都会经过这里的处理
:param f: 当前正在处理的 http
:return:
"""
if self.filter is None or self.filter == "":
pass
else:
# 判断当前过滤条件是否合法
flt = flowfilter.parse(self.filter)
if not flt:
pass
else:
if flowfilter.match(self.filter, f):
# 显示当前匹配的数据
ctx.log.warn("返回数据 Raw:<< " + str(binascii.b2a_hex(f.response.content)))
if ctx.master.options.verbosity == 0:
# 自动解密
ctx.log.warn("自动解密返回数据 >> ")
ctx.log.warn(alldecrypt(f.request.host, f.response.content, 1))
else:
pass
def request(self, f: HTTPFlow):
"""
全部 request 都会经过这里的处理
:param f: 当前正在处理的 http
:return:
"""
if self.filter is None or self.filter == "":
pass
else:
# 判断当前过滤条件是否合法
flt = flowfilter.parse(self.filter)
if not flt:
pass
else:
if flowfilter.match(self.filter, f):
# 显示当前匹配的数据
ctx.log.warn("请求数据 Raw:>> " + str(binascii.b2a_hex(f.request.content)))
if ctx.master.options.verbosity == 0:
# 自动解密
ctx.log.warn("自动解密请求数据 >> ")
ctx.log.warn(alldecrypt(f.request.host, f.request.content, 0))
else:
pass
def start(self):
"""
处理插件 ui,以及函数修改
:return:
"""
oldview = contentviews.get(mydecrypt.name)
if oldview is not None:
contentviews.remove(oldview)
contentviews.add(mydecrypt)
# 修改原方法,增加保存当前 flow 的功能
mitmproxy.tools.console.flowview.FlowView.view_response = myview_response
mitmproxy.tools.console.flowview.FlowView.view_request = myview_request
# ctx.master.options.verbosity 此变量控制自动解密,0 开启自动解密,默认关闭
if ctx.master.options.verbosity == 0:
# 处理解密模式时日志显示,只显示重要信息
mitmproxy.log.log_tier = mylog_tier
ctx.log.warn("解密插件加载完成")
def start():
ctx.log.warn("解密插件开始加载")
return Myaddon()
def done():
contentviews.remove(mydecrypt)