V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
firebroo
V2EX  ›  Python

Python 调用 elasticsearch 的 bulk 接口批量插入数据出现内存泄露,导致 OOM

  •  
  •   firebroo ·
    firebroo · 2016-12-22 09:28:45 +08:00 · 13354 次点击
    这是一个创建于 2927 天前的主题,其中的信息可能已经有所发展或是发生改变。

    数据导入脚本如下

    import time
    import sys
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    
    reload(sys)
    sys.setdefaultencoding('utf-8')
    
    def set_mapping(es, index_name = "content_engine", doc_type_name = "en"):
        my_mapping = {
                "en": {
                    "properties": {
                        "a": {
                            "type": "string"
                        },
                        "b": {
                            "type": "string"
                        }
                    }
                }
        }
        create_index = es.indices.create(index = index_name,body = my_mapping)
        mapping_index = es.indices.put_mapping(index = index_name, doc_type = doc_type_name, body = my_mapping)
        if create_index["acknowledged"] != True or mapping_index["acknowledged"] != True:
            print "Index creation failed..."
    
    def set_data(es, input_file, index_name = "content_engine", doc_type_name="en"):
        i = 0
        count = 0
        ACTIONS = []
        for line in open(input_file):
            fields = line.replace("\r\n", "").replace("\n", "").split("----")
            if len(fields) == 2:
                a, b = fields
            else:
                continue
            action = {
                "_index": index_name,
                "_type": doc_type_name,
                "_source": {
                      "a": a,
                      "b": b, 
                }
            }
            i += 1
            ACTIONS.append(action)
            if (i == 500000):
                success, _ = bulk(es, ACTIONS, index = index_name, raise_on_error = True)
                count += success
                i = 0
                ACTIONS = []
    
        success, _ = bulk(es, ACTIONS, index = index_name, raise_on_error=True)
        count += success
        print("insert %s lines" % count)
    
    
    if __name__ == '__main__':
        es = Elasticsearch(hosts=["127.0.0.1:9200"], timeout=5000)
        set_mapping(es)
        set_data(es,sys.argv[1])
    

    数据大概 5 个 G 吧,机器配置虚拟机 24G 内存,刚开始无内存泄露现象,这个 Python 脚本的进程内存一直保持 1G 左右的占用,当插入 1600 w,内存开始持续飙升,最后达到 22G ,导致触发 OOM 机制, Python 进程被内核 kill ,差点怀疑人生。。大家在遇到 Python 内存泄露都是怎么定位的?

    20 条回复    2018-02-02 17:58:17 +08:00
    Zuckonit
        1
    Zuckonit  
       2016-12-22 09:44:06 +08:00
    1 、 gc
    2 、 objgraph
    yzmm
        2
    yzmm  
       2016-12-22 09:51:03 +08:00
    5w bulk 一次,再不行重新建立下 es 对象试试
    yuankui
        3
    yuankui  
       2016-12-22 10:14:00 +08:00
    没有人对你这么烂的代码感兴趣,这是事实,必须承认.

    试试,找个同事或者同学,然后口述你代码逻辑,也许你会自己发现问题~
    firebroo
        4
    firebroo  
    OP
       2016-12-22 10:28:27 +08:00
    @yuankui 你要是发现这代码哪里导致的内存泄露,就说出来,我承认我是渣渣没问题的。
    yuankui
        5
    yuankui  
       2016-12-22 10:51:25 +08:00
    @firebroo 其实我本意不是说你代码烂.

    内存泄露一般出现在循环里面向循环外的容器塞数据,导致内存泄露.

    你代码里的 ACTIONS 变量,在循环里面每次都塞一些数据,然后直到函数结束才释放.

    也就是说, ACTIONS 里面包含整个文件的数据?

    5G 的文件啊,哥.
    yuankui
        6
    yuankui  
       2016-12-22 10:52:52 +08:00
    忽略上面的,代码没仔细看..
    p2p
        7
    p2p  
       2016-12-22 11:22:21 +08:00
    如 2l 说的 减小 bulk 阀值, 直到没有内存问题
    jimmyye
        8
    jimmyye  
       2016-12-22 11:24:33 +08:00
    参考这里: https://github.com/elastic/elasticsearch-py/issues/297
    1.试试用 generator 改写,
    2.因为 bulk 调用 streaming_bulk ,试试调整 chunk_size 、 max_chunk_bytes : http://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.streaming_bulk
    firebroo
        9
    firebroo  
    OP
       2016-12-22 12:46:41 +08:00 via Android
    @p2p 我试过减少 bluk 到 5w ,内存依然炸裂的
    @Zuckonit 我是进程运行一段时间之后产生的内存泄露,有啥工具可以注入 Python 进程查看 gc 情况吗?

    @jimmyye 晚上回去试试。
    miraclinger
        10
    miraclinger  
       2016-12-22 18:37:23 +08:00
    官网给的推荐是 1,000 to 5,000 条数据,文件大小是 5-15MB , https://www.elastic.co/guide/en/elasticsearch/guide/master/bulk.html
    miraclinger
        11
    miraclinger  
       2016-12-22 19:23:33 +08:00
    有个思路是用 linux 的切割命令: split -l 5000 input_file
    再就是用多线程进行批量导入,线程数量最好是 200 个左右
    miraclinger
        12
    miraclinger  
       2016-12-22 19:24:48 +08:00
    有个思路是用 linux 的切割命令: split -l 5000 input_file
    再就是用多线程对分割的文件 进行批量导入,线程数量最好是 200 个左右
    WKPlus
        13
    WKPlus  
       2016-12-22 20:37:44 +08:00
    没用过 python es 的库,但是看你的代码,如果 es 存了 ACTIONS 这个 list 的引用,有可能有内存泄露。把 ACTIONS = []改成 del ACTIONS[:]试下?
    firebroo
        14
    firebroo  
    OP
       2016-12-22 23:06:00 +08:00
    @miraclinger 嗯,我看了你的链接,官方的意思是推荐从一次导入 1000-5000 条开始测试直到找到最佳 performance 吧, 可能我的不是最佳,但是和这个应该没有关系,分割为小文件我导入我想过(现在我朋友推荐我使用 Java 的 API 用 9300 端口走 TCP 导入),但是我其实想找到内存泄露的原因呢。
    @WKPlus 试过了,依然 oom ,我还试过 del 之后用 gc 库显示回收 gc ,也是炸裂。
    firebroo
        15
    firebroo  
    OP
       2016-12-23 00:00:08 +08:00
    miraclinger
        16
    miraclinger  
       2016-12-23 09:56:52 +08:00
    虽然已结贴,但是我还想问下,如果把值调成 5000 ,会出现内存泄露不?因为看了下 github 上的生成器,给我的感觉是一次性导入数据,不知道我有没有看错,如果这样的话,效率会比较低吧。
    enenaaa
        17
    enenaaa  
       2016-12-23 10:44:50 +08:00
    可以在内存飙升的时候看看具体是消耗在哪了。
    貌似有 guppy 之类的工具可用?
    firebroo
        18
    firebroo  
    OP
       2016-12-23 12:03:57 +08:00
    @miraclinger 晚上我测试完了给你结果,我觉得还是会泄露, github 那个它说 bluk 内部有 chunking ,默认好像是 chunking size 是 5000 吧,理解为 5000 个 documents 请求一次 es 的 API 就行。

    @enenaaa 我取 stackoverflow 提问,有人推荐 pypi.python.org/pypi/memory_profiler ,但是我这个情况还是不适用。
    starsliao
        19
    starsliao  
       2018-02-02 13:48:00 +08:00
    @firebroo 哥,您那个改成生成器的方式肿么写的?能贴个代码出来么?感激不尽。。。
    firebroo
        20
    firebroo  
    OP
       2018-02-02 17:58:17 +08:00
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5276 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 09:28 · PVG 17:28 · LAX 01:28 · JFK 04:28
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.