V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
ComplexPug
V2EX  ›  程序员

关于一个经典海量数据的问题

  •  
  •   ComplexPug · 20 天前 · 1270 次点击

    2 个 G 的字符串,找出出现次数最多的 100 个。感觉对搜索到的答案倒不是很理解。 比如

    把 2G 的文件进行分割成大小为 512KB 小文件,总共得到 2048 个小文件,避免一次性读入整个文件造成内存不足。 定义一个长度为 2048 的 hash 表数组,用来统计每个小文件中单词出现的频率。 使用多线程并行遍历 2048 个小文件,针对每个单词进行 hash 取模运算分别存储到长度为 2048 的 hash 表数组中 inthash=Math.abs(word.hashCode() %hashTableSize);hashTables[hash].merge(word, 1, Integer::sum); 接着再遍历这 2048 个 hash 表,把频率前 100 的单词存入小顶堆中 最后,小顶堆中最终得到的 100 个单词,就是 top 100 了。 这种解决方案的核心思想是将大文件分割为多个小文件,然后采用分治和堆的算法,来解决这个问题。

    如果是多线程使用 hashmap ,内存中仅仅去除了重复字符串吧。

    还有一种使用字典树的,但出现频次很低的时候内存占用也不小。

    还有一种是按照 hash(word)%2048 来保存小文件,但不适用于单词频次高使得小文件变大。

    还有一种是把大文件直接拆分成小文件,然后对每个文件求 topK ,最后求整个的 topK 。这个思路虽然时间没问题,但能保证 topK 的正确性吗。如果按照普通的小顶堆 topk 算法,使用的是出现次数,但这里因为没用 hash 分组,的一个字符的出现次数是分开的,并不准确吧,只能保证大概率正确。

    我的想法是要想保证强正确性且不能保证数据安全,按照 hash(word)%2048 保存文件,小文件使用 hashmap ,大文件使用字典树。最后对所有文件的频次使用 topk 算法。

    问过别人但交流了半天他没懂,想问一下我对这个问题的理解对吗,或者有什么好的解决方案。

    第 1 条附言  ·  20 天前

    还有一种是把大文件直接拆分成小文件,然后对每个文件求 topK ,最后求整个的 topK 。这个思路虽然时间没问题,但能保证 topK 的正确性吗。如果按照普通的小顶堆 topk 算法,使用的是出现次数,但这里因为没用 hash 分组,的一个字符的出现次数是分开的,并不准确吧,只能保证大概率正确。

    有些错别字。

    还有一种是把大文件直接拆分成小文件,然后对每个文件求 topK ,最后求整个的 topK 。这个思路虽然时间没问题,但能保证 topK 的正确性吗。如果按照小顶堆 topk 算法,使用的是出现次数,但这里因为没用 hash 分组,一个word的出现次数是分开的,并不准确吧,只能保证大概率正确。

    18 条回复    2024-04-19 12:14:47 +08:00
    jorneyr
        1
    jorneyr  
       20 天前
    1. 统计每个字符串出现的次数。
    2. TopK 算法,就是个小顶堆。
    dusu
        2
    dusu  
       20 天前 via iPhone
    安全方面细分下来应该是 topK 这个 K 取值的问题
    misdake
        3
    misdake  
       20 天前
    我感觉吧,应该是全局总共只有 2048 个 bucket ,依靠 word 的 hash 来 index ,每个 bucket 里是 hashmap 维护这个 hash 的那些单词的词频。
    说白了就是类似于一个大的 concurrent hashmap 的实现,分组加锁或分组搞无锁队列来支持多线程更新。每个小文件读完,就遍历小文件的词频累加。
    整个 2048bucket 结构对外暴露的其实是单个的 hashmap<word, 频数>。最后一步是这个 hashmap 遍历 kv 搞 topk 。
    misdake
        4
    misdake  
       20 天前
    @misdake 分布式的话就是每台机器只负责某些 bucket ,最后一步每个机器自己先求 topk ,然后汇总。
    ComplexPug
        5
    ComplexPug  
    OP
       20 天前
    @misdake 那确实肯定速度快,但可能面试的时候限制你内存大小,或者让你筛选 1000G 的 top100 ,不过分布式肯定还能处理。
    wxf666
        6
    wxf666  
       20 天前
    @ComplexPug 有些极端情况,咋办呢?

    1. 只有俩 500GB 的不同的字符串,但它们 hash 相同。
    2. 所有字符串,都只出现一次。
    ComplexPug
        7
    ComplexPug  
    OP
       18 天前 via Android
    @wxf666 第一个问题,没有特别长的字符串,问题里串长度不会很长(确实没说清楚),第二个问题的话直接按照 hash 分组就好了,然后每组 map 统计次数之后进行 topk 算法,因为 hash 映射的很均匀,每组基本都一样大
    wxf666
        8
    wxf666  
       18 天前
    @ComplexPug #7 关于第二个问题:

    1. 直观上说,每个字符串都只出现一次,最后取哪 100 个字符串呢?

    2. 假如说,题目保证肯定有 100 个字符串,出现次数最多。

    那如果,我这样构造原始文件呢:

    你分割的前 2047 个小文件里,所有字符串都只出现一次,
    在第 2048 个文件里,100 个字符串出现两次。

    你在构造哈希表,甚至每个分组统计 TopK 时,会不会要在内存里,保留几乎整个原始文件的字符串了?

    那内存会不会爆炸了。。
    ComplexPug
        9
    ComplexPug  
    OP
       18 天前
    @wxf666 1.因为这个问题本质是按照出现次数排序,出现次数都一样,那肯定还要制定进一步的规则(比如出现次数相同的优先保留字典序小的),如果不特意指定规则,只能随便取 100 个吧。不过这个问题暂且不用考虑。保证出现多的严格在小的前面即可。2.每组 hashmap 统计出现次数保留前 100 大的。保留 KV 对,而不是 10 个字符串保留十个
    wxf666
        10
    wxf666  
       18 天前
    @ComplexPug #9

    假设你是单线程,计算完第一个文件里,每个字符串次数后,

    你要对那 2048 长度的哈希表,做什么呢?


    1. 啥也不干,继续算第二个文件?

    (若原文件,每个字符串只出现一次,那你不就相当于在内存里,保留整个原文件了吗。。)


    2. 遍历哈希表的,2048 个子哈希表,丢弃 100 名外的?

    (若原文件,每个字符串出现两次,唯独一个字符串,出现 2048 次,但分散在每个分割后的文件里。岂不是会被你,每次遍历完一个文件后,丢弃掉?)


    3. 我理解错了,等待你补充。。
    ComplexPug
        11
    ComplexPug  
    OP
       18 天前
    @wxf666 没太懂你说的“2048 长度的哈希表”是什么,2048 应该是小文件个数,2048 长度是什么。还有你这个分组是直接切分的吗。首先你的 1 肯定是不对的,其次你的 2 可能是这个意思,感觉领悟到你的意思了。

    你是觉得 2 这个做法是有问题的吗,我知道你这种构造方法不对。我发这个帖子就是询问这个问题。而且你这个做法我在贴里说明了,是有正确率问题的。不知道你懂我意思了吗感觉(感觉确实是我的表达能力的问题🥲)。

    我在原文里对这种做法的评价是:并不准确吧,只能保证大概率正确。
    wxf666
        12
    wxf666  
       18 天前
    @ComplexPug #11

    1. 噢,说错了,是 2048 长度的哈希表《数组》。

    2. 感觉你是想说,计算完一个文件后,就分别把 2048 个哈希表里的频率统计,追加到 2048 个文件里?

    计算完所有文件后,再挨个频率文件,计算总频率,且始终只保留频率前 100 的字符串?

    感觉这方法是准确的,但极端情况下,频率文件 > 内存限制时,会爆内存。

    比如,所有字符串只出现一次,原文件大小 / 2048 > 内存限制了。。

    或者,被恶意构造字符串了,使得所有 str.hash() % 2048 后,都挤在同一个文件内。。
    ComplexPug
        13
    ComplexPug  
    OP
       17 天前 via Android
    @wxf666 我回答你比如后面的两个例子吧,第一个例子,按照 hash%2048 之后,一组一组的统计频率(单线程的),当前这组统计完后遍历一下,放到小顶堆(只有一个,不是一组一个)里面,堆大小不超过 100 ,最后答案就是堆里面的 100 个串。一组做完之后释放内存,所以同时只有一个组和小顶堆的内存。对于第二个例子,恶意 hash 吗,我觉得卡 hash 还是比较难的,hash 保证了每组分到的串的种类一样多,是种类不是频率,所以有的组会比较大(就是因为某些个串出现的次数太多了),但是你保存的是文件,文件大一点没关系,内存不是很大就可以了,内存和 map 有关,map 存的是 kv 对,频率很大也是一样的内存。

    等我有空下午写个伪代码。


    感觉确实我确实对帖子正文理解的有问题。直接 hash%2048 分组按照我这个评论就是对的,内存占用比较小的解法。
    ComplexPug
        14
    ComplexPug  
    OP
       17 天前
    @wxf666
    伪代码,不是 python
    ```code
    BLOCK_SIZE = 2048
    TOP_N = 100
    def fun(goal_file):
    files f[BLOCK_SIZE] = empty;
    for str in goal_file:
    f[str.hash()%BLOCK_SIZE].append(str)

    heap<(value,count)> q = empty;# (ordered by count,top is smallest)
    for i in range(BLOCK_SIZE):
    hashmap mp = empty
    for str in f[i]:
    mp[str] += 1
    for value,count in mp:
    if q.size() > TOP_N:
    if q.top().count > count:
    q.pop()
    q.push((value,count))
    else:
    q.push((value,count))
    mp.clear()
    return q.key()
    ```
    wxf666
        15
    wxf666  
       14 天前
    @ComplexPug #14 我用 Lua 写了个(可限制内存)词频统计,并和 DuckDB 比了下速度,感觉还行。


    1. 测试数据

    [知乎回答]( https://www.zhihu.com/answer/1906560411 ) 里,分享的《英文维基百科(仅文本版)》,并简单用脚本预处理了下(去标点、每词一行、转小写)。

    解压后:13.33 GB
    处理后:12.84 GB ,23 亿词(不重复的有 854W )


    2. 测试结果

    - DuckDB (不限制内存):4 分半( 1.0X )
    - Lua(最多存100M文本):7 分半( 1.6X ),01 个共 103MB 临时文件
    - Lua(最多存 10M文本):8 分钟( 1.7X ),23 个共 306MB 临时文件


    3. 测试硬件:i5-8250U 低压 CPU 轻薄本。。


    4. 运行截图




    5. 预处理脚本

    ```shell
    unzip -p enwiki_text.zip | tr -cs "[:alnum:]'-" '\n' | tr [:upper:] [:lower:]
    ```


    6. Lua 脚本( V 站吞空格,所以将行首缩进,都转为全角空格了)

    ```lua
    #!luajit
    local TOP_NUM = 1000
    local MAX_SIZE = tonumber(arg[1] or 16) * 2 ^ 20

    local dict = {}
    local dict_size = 0
    local files = {}
    local words = {}
    local word = ''
    local freq = 0

    function heap_swap(heap, comp, lhs, rhs, func, next)
      if comp(heap[lhs], heap[rhs]) then
       heap[lhs], heap[rhs] = heap[rhs], heap[lhs]
       func(heap, comp, next)
      end
    end

    function heap_up(heap, comp, idx)
      if idx > 1 then
       local pa = math.floor(idx / 2)
       heap_swap(heap, comp, idx, pa, heap_up, pa)
      end
    end

    function heap_down(heap, comp, idx)
      if idx <= #heap / 2 then
       local left, right = idx * 2, idx * 2 + 1
       local son = right > #heap and left or
       (comp(heap[left], heap[right]) and left or right)
       heap_swap(heap, comp, son, idx, heap_down, son)
      end
    end

    function heap_push(heap, comp, val)
      table.insert(heap, val)
      heap_up(heap, comp, #heap)
    end

    function heap_pop(heap, comp)
      heap[1] = heap[#heap]
      table.remove(heap)
      heap_down(heap, comp, 1)
    end

    function heap_adjust(heap, comp, idx)
      heap_down(heap, comp, idx)
      heap_up(heap, comp, idx)
    end

    function comp(lhs, rhs)
      return lhs[1] < rhs[1]
    end

    function sorted_keys(dict)
      local keys = {}
      for k in pairs(dict) do
       table.insert(keys, k)
      end
      table.sort(keys)
      return keys
    end

    function push_word()
      if #word > 0 then
       if #words < TOP_NUM then
        heap_push(words, comp, {freq, word})
       elseif freq > words[1][1] then
        words[1] = {freq, word}
        heap_adjust(words, comp, 1)
       end
      end
    end

    function try_save_dict(new_key)
      dict_size = dict_size + (new_key and #new_key or 0)
      if not new_key or dict_size > MAX_SIZE then
       local file = io.tmpfile()
       for idx, key in ipairs(sorted_keys(dict)) do
        file:write(dict[key], ' ', key, '\n')
       end
       dict = {}
       file:seek('set')
       dict_size = new_key and #new_key
       table.insert(files, {'', 0, file})
      end
    end

    for word in io.lines() do
      dict[word] = (dict[word] or try_save_dict(word) or 0) + 1
    end
    try_save_dict()

    while #files > 0 do
      local file, read_ok = files[1]
      if word == file[1] then
       freq = freq + file[2]
      else
       push_word()
       word, freq = file[1], file[2]
      end
      file[2], read_ok, file[1] = file[3]:read('n', 1, 'l');
     (read_ok and heap_adjust or heap_pop)(files, comp, 1)
    end
    push_word()

    table.sort(words, comp)
    for i = #words, 1, -1 do
      io.write(string.format('%d %s\n', words[i][1], words[i][2]))
    end
    ```
    ComplexPug
        16
    ComplexPug  
    OP
       11 天前 via Android
    明天看看
    ComplexPug
        17
    ComplexPug  
    OP
       11 天前
    看不懂 lua😵,尤其是那个 while 。不过看着像是没按照 hash 分,是直接读取,hash 里面的<K,V>太多就保存起来。那你怎么合并的不同组的相同的 K 的,比如一组里面有<K,2>,另外一组有<K,3>,怎么合并到一起。
    ComplexPug
        18
    ComplexPug  
    OP
       11 天前
    不过看你评测,实际跑起来这个算法还是很不错的嘛。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   1780 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 16:18 · PVG 00:18 · LAX 09:18 · JFK 12:18
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.