V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
benjiam
V2EX  ›  Python

Python 实践之 400 行 Python 写一个类 Hadoop 的 MapReduce 框架

  •  2
     
  •   benjiam · 2014-11-27 21:56:50 +08:00 · 4262 次点击
    这是一个创建于 3684 天前的主题,其中的信息可能已经有所发展或是发生改变。
    Google 的老三篇已经快问世十年了,但这10年里我只是肤浅的了解了一下Mapreduce的原理,并无深入的实践过程真是惭愧。最近2天,抱恙在家,开始了学习Hadoop 的工作原理。说起来Hadoop 极其复杂,各种派生类盘根错杂。但是其实MapReduce的本身工作原理却并不太复杂。可以说很好懂,简单的来说就是将数据分块,并行的进行MAP 操作,再将并行进行Reduce操作。利用并行计算的优势,减少业务的处理时间。

    为此我花了2天时间,用python 实现了一套类Hadoop的Mapreduce框架。更确切的说这只是一个简单的模型, 用户只要写一个python文件,将业务的 InputSplit , Map, Reduce, OutputFormat 四个接口简单的实现。框架会根据传入的参数,依次调用这几次函数,完成一个最简单Mapreduce的程序了。

    例子一共用了400 行,主要实现以下几个功能,

    一个sort 的客户程序, 实现了 InputSplit , Map, Reduce, OutputFormat 这四个接口,完成具体业务相关的。 它完成的业务 就是sort –k 1 –n xxx.txt | uniq –c | awk’{print $2” “$1}’

    一个server程序,它主要起到了一个消息队列的作用,主要缓存每次业务做完中间结果。
    一个task_master 主程序,他是业务的灵魂,它负责启动各次业务的启动,启动多个子进程模拟多节点情况,并行地运行MAP 和Reduce操作。同时并关注每次业务完成情况,负责整体业务流程。
    其他一些工具函数。比如序列化对象保存到本地文件。


    从宏观的角度看MapReduce的过程 (图摘自 Hadoop技术内幕)


    从图中我们可以知道split , map (group by sort),reduce task 这几个步骤是靠客户程序来完成。
    而系统则是完成整体业务,并将这几个步骤串起来 就完成了Hadoop框架本身的功能。



    这个框架本身极其简陋的,不支持分布式文件系统,只能用本地文件模拟。不支持多机处理只能用本地进程模拟worker。没有单点故障监控,更不支持业务调度这样的复杂功能。
    单纯但从一个业余python开发者的角度看,我认为用400来行代码就实现了一个Hadoop 的Mapreduce的原型说明python 的表现能力和效果都是非常惊人。 是实现项目原型很好的一种选择。

    技术实现:
    1) 动态调用python 模块
    框架实现以后,就应该不会再修改了。它可以一直运行,客户只需要编写符合要求的python 模块,这个框架就能自动去调用了。
    那么如何让框架去调用一个它甚至不知道的名字的模块?
    比如 我要做一个wordcount 的操作, 完成代码编写以后, 我只需要将字符串 “wordcount” 传入框架,框架就会自动去执行 wordcount 类里定义的 InputSplit, Map, Reduce, OutputFormat 这样几个函数。 下一次 我又写了一个 “TOP10” 的操作,我只需要将字符串 “TOP10” 传入框架,框架就会自动去执行 TOP10类里定义的 InputSplit , Map, Reduce, OutputFormat 这样几个函数。

    JAVA里我们有反射能做到这点,Python 里也有类似的方法 (GO是静态语言,我不知道如何实现,有高人指点请指点)代码如下
    def run_task(modename, functionname,arg):
    obj = __import__(modename) # import module
    c = getattr(obj,modename)
    obj = c() # new class
    fun = getattr(obj,functionname)
    fun(arg) # call def

    自此通过参数将 类名的字符串,函数名,参数都以字符串的形式传进去,一切就都好了。

    2)序列化
    每个业务的中间结果都是一些MAP,但是如何传给下一个工序呢? 我采用的方法是 使用python 序列化将中间结果对象,序列化成一个文件,将文件名保存在消息队列中,下一道工序通过消息队列获取相应的文件名,反序列化文件。获得上一次步骤的中间结果,继续操作。

    3)消息队列
    就是一个全局的MAP,以 任务ID 作为主键的一个分层MAP。

    剩下的事情,就是耐心了。我相信一个普通的程序员几个钟头都是能完成的了。

    代码在:

    https://github.com/xiaojiaqi/py_hadoop
    11 条回复    2014-11-30 12:19:15 +08:00
    benjiam
        1
    benjiam  
    OP
       2014-11-27 21:57:44 +08:00
    木有办法贴图。 只能贴点文字
    hahastudio
        3
    hahastudio  
       2014-11-27 22:24:06 +08:00   ❤️ 1
    你这个看来可以部署到多台机器上?挺有意思

    我之前看到了这篇文章:
    https://medium.com/@thechriskiehl/parallelism-in-one-line-40e9b2b36148
    之后也手滑了一个 400 行以内的 MapReduce 模型,不过是单机版的
    https://gist.github.com/hahastudio/401ff4dc382ad75e4d3f
    benjiam
        4
    benjiam  
    OP
       2014-11-27 22:29:24 +08:00
    我只是学习hadoop 时候,顺手写写。 可惜我python很弱,所以代码也比较业余。
    从mapreduce来看,多机的实现的关键 需要一个分布式的文件系统,其次需要一个全局的监控节点。

    如果 简单的加个多机监控节点,用restful 接口替代本地文件接口的话。我的框架是可以多机并行跑的。但是速度未必会快。
    benjiam
        5
    benjiam  
    OP
       2014-11-27 23:03:54 +08:00
    明天一早发微博 欢迎帮转 @如此玄妙
    2232588429
        6
    2232588429  
       2014-11-28 07:09:37 +08:00 via iPhone
    收藏一个,慢慢学习
    helloworld00
        7
    helloworld00  
       2014-11-28 07:13:16 +08:00
    如果只是简单任务并行化的话mapreduce跟mpi几乎没有区别

    mapreduce这一套关键在于把并行化里面可能出现的问题都有一整套相应的解决方案(例如stragger,failure用heartbeat和specuative tasks)解决了,从而使得程序员可以专注于处理写复杂的数据处理程序而不是操心太多并行分布式的东西。。。
    benjiam
        8
    benjiam  
    OP
       2014-11-28 08:26:21 +08:00 via iPad
    mapreduce最大的创新是认为业务是靠map 和reduce可以完成的。mpi只是把任务分布去做。mpi是可以解决所有问题的,map不是。
    gateswong
        9
    gateswong  
       2014-11-29 11:13:35 +08:00 via iPad
    python里可以用colander库做dict的序列化工作
    benjiam
        10
    benjiam  
    OP
       2014-11-29 19:06:01 +08:00
    我用了 pickle 做序列化。
    benjiam
        11
    benjiam  
    OP
       2014-11-30 12:19:15 +08:00
    目标
    1) 多机部署
    2) window linux 同时支持
    3) 至少性能上要比sort 更快!
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1323 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 17:49 · PVG 01:49 · LAX 09:49 · JFK 12:49
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.