V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX  ›  wxf666  ›  全部回复第 1 页 / 共 28 页
回复总数  549
1  2  3  4  5  6  7  8  9  10 ... 28  
@Keuin #20 写入量还是太大了。我手撸,只写一遍,都觉得大。。

换成算 SHA-1 ,最差情况,只需要写 203e8 * (20 + 6 该行偏移量) / 2 ^ 30 = 492 GB 即可。

当然,自己写肯定不如久经考验的工具成熟稳定,第一次花的时间精力也多。。



@heguangyu5 #21 C/C++ 新人,看了下排行前三的 HashTable ,感觉每行只需 11 字节即可?

6 字节原始文件偏移量 + 5 字节与原始位置的距离( unordered_dense )或下一节点数组下标( emhash )?

狠一点儿,ceil(log2(6.20 * 2^40)) - 1 + ceil(log2(203e8)) = 77 bits / 行?总共需 182 GB 即可?

剩下几字节,你用来存啥了呢。。
@Keuin #18 你是说,类似这样吗?

awk '{print NR" "$0}' in.txt | sort -u -k2 | sort -n | cut -d' ' -f 2-

感觉写入量翻倍。。而且很难扩展到(可能跨多行的) csv ?


@heguangyu5 #13

1. 平均下来,每行花费 14.4 字节内存,肯定没存原字符串。

hash 冲突时,你要回源文件,具体比较两行吗?那随机读会不会很多。。

换句话说,随机生成 1E 行,又把这 1E 行打乱,追加到原文件。dedup-use-more-mem 会随机读 1E 次吗?

2. dedup-use-less-mem 需要额外写多大文件呢?有多少次随机读写呢?这个支持流式读源文件吗?
@Keuin #11 原帖还要求,保持文本原有顺序诶。。

分块归并排序确实好用,我在原帖也手撸了一个,i5-8250U 每分钟能排序 25 GB 。但读写量还是太大了。。

换成只写入 MD5/SHA-1 值的话,读写量能减少 95%。代价就是有极小概率会哈希冲突。。

但也能通过回原文件比较两行解决。代价就是可能会有不少的随机读,和重复行数量成正比。。
有点感兴趣,问一下楼主:

1. 楼主硬盘读写速度多少?

2. 可以指定限制多少内存完成吗?

3. 有不同的两行,恰好 hash 相同,会出问题吗?

4. 除顺序读一次原文件外,还需要额外读写多少文件吗?

5. 能轻而易举改造成,针对 CSV 文件(可能有字符串跨多行),且现有成绩影响不大,是吗?
@lsk569937453
@drymonfidelia #4

我在原贴 99 楼回复了,用的分块排序、归并去重方法,

6 秒(保持顺序地)去重 2.50 GB 文本( 900 万行,336 字/行),
照此速度,预计 4 小时即能去重完毕 6.20 TB 文本?

这是在七年前 i5-8250U 轻薄本上,限制 1 GB 内存,四线程跑出来的。(截图在原贴)
当然,文本存在内存盘里,读 8G/s ,写 3G/s ,1000 元 2TB 的固态都有的水平,应该不算过分。


@tool2dx #3

精确去重,不应该是每行两两比较吗?

但如果能承受极小概率冲突,用哈希应该会快很多。


@jsjscool #18

大佬是用什么写的,只需要几行就行呢?

我也是用的 merge sort + 多线程,但写了两三百行了。。


@securityCoding #23

能自己手撸一个跑吗?感觉也不算慢呀。。


@noparking188 #25

手撸了一个,不知对你有帮助吗?
C++ 新人,写个去重练练手。

- 结果:2.50 GB 文本( 900 万行,336 字/行),1GB 内存限制,6 秒保持顺序地去重完毕。
- 硬件:七年前 i5-8250U 轻薄本,读写在内存盘中(读 8G/s ,写 3G/s ,1000 元 2TB 固态都能有的速度,不过分吧?)
- 预计:4 小时能去重完毕 6.20TB ?

新人刚学会写,可能还有诸多不足之处。
写的过程中,还有很多优化点没写。比如:

1. 排序时,子范围太小,转为其他排序方式。
2. 读写文件,用的默认缓冲区大小( 4K ? 16K ?不知道多大,估计很小。。)
3. 分块时,可以去除重复行,减少稍后读写数据量。

继续改进点:

- 转用 hash 去重,大幅减少硬盘读写数据量。
- 只是要承担极小概率重复风险。但 Git 也在用这种方式。。
- 实在不行,发现重复 hash 时,再去读原文件完整比较。

## 截图

https://i.imgur.com/N29c3EY.png

## 代码

```c++
// V 站吞空格,缩进改为全角空格了

#include <queue>
#include <vector>
#include <thread>
#include <cstring>
#include <sstream>
#include <fstream>
#include <iostream>
#include <algorithm>
#include <stdexcept>
#include <filesystem>
#include <string_view>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>

using std::ios;
using std::vector, std::string_view;
using std::to_string, std::ofstream;
namespace fs = std::filesystem;

int max_thread = 8;
size_t max_memory = 1ull << 30;
const auto tmpDir = fs::temp_directory_path();

struct Meta {
   ptrdiff_t offset;
   size_t length;
   friend ofstream& operator<< (ofstream& ofs, const Meta& self) {
     ofs.write(reinterpret_cast<const char*>(&self), sizeof self);
     return ofs;
  }
};

struct Line {
   int chunkIdx{};
   ptrdiff_t offset{};
   string_view str{};
   auto operator> (const Line& other) {
     return std::tie(str, chunkIdx, offset)
      > std::tie(other.str, other.chunkIdx, other.offset);
  }
};

template <class T = char>
class MappedFile {
   int fd = -1;
   const T* ptr{};
public:
   const T* data{};
   size_t size{};

   explicit MappedFile(const fs::path& file) {
     struct stat64 fs{};
     fd = open64(file.c_str(), O_RDONLY);
     if (fd != -1 && fstat64(fd, &fs) != -1) {
       size = static_cast<size_t>(fs.st_size) / sizeof(T);
       data = ptr = static_cast<T*>(mmap64(nullptr, fs.st_size, PROT_READ, MAP_SHARED, fd, 0));
    }
  }

   MappedFile(const MappedFile& other) = delete;

   MappedFile(MappedFile&& old) noexcept:
     fd(old.fd), ptr(old.ptr), data(old.data), size(old.size) {
     old.fd = -1;
     old.ptr = old.data = nullptr;
  }

  ~MappedFile() {
     if (data) munmap(const_cast<T*>(data), size * sizeof(T));
     if (fd != -1) close(fd);
  }

   auto end() const {
     return data + size;
  }

   operator const T*&() {
     return ptr;
  }
};

template <class Iter>
void mergeSort(Iter* src, Iter* dst, size_t len, int max_thread = 1, int id = 1) {
   if (id == 1)
     std::copy_n(src, len, dst);
   if (len > 1) {
     std::thread t;
     size_t half = len / 2;
     if (id < max_thread) // 只在左子树开启新线程
       t = std::thread(mergeSort<Iter>, dst, src, half, max_thread, id * 2);
     else
       mergeSort(dst, src, half, max_thread, id * 2);
     mergeSort(dst + half, src + half, len - half, max_thread, id * 2 + 1);
     if (t.joinable())
       t.join();
     std::merge(src, src + half, src + half, src + len, dst);
  }
}

// 步骤 1:分块,返回块数
int step1_SplitChunks(const fs::path& inFile) {

  // 映射源文件
   MappedFile text {inFile};
   if (!text) throw std::runtime_error("无法打开输入文件");

  // 分块,直到源文件结束
   int chunkCount = 0;
   for (auto chunkBegin = +text; (chunkBegin = text) < text.end();) {

    // 不断记录行,直到(此次遍历过的源文件大小 + 行数据数组大小 * 2 )到达内存限制
     vector<string_view> lines, sortedLines;
     while (text < text.end() && (text - chunkBegin + sizeof(string_view) * lines.size() * 2) < max_memory) {
       auto lineEnd = (char*) std::memchr(text, '\n', text.end() - text);
       auto lineLen = (lineEnd ? lineEnd : text.end()) - text;
       lines.emplace_back(text, lineLen);
       text += lineLen + 1;
    }

    // 准备写入(排序后)分块、行数据。
     ofstream chunkFile (tmpDir / (to_string(chunkCount) + ".txt"), ios::binary | ios::trunc);
     ofstream metaFile (tmpDir / (to_string(chunkCount) + ".meta"), ios::binary | ios::trunc);
     chunkCount++;

    // 多线程排序行数组
     sortedLines.resize(lines.size());
     mergeSort(lines.data(), sortedLines.data(), lines.size(), max_thread);

    // 保存(排序后)每行文本、偏移、长度
     for (auto line: sortedLines) {
       chunkFile << line;
       metaFile << Meta{line.data() - chunkBegin, line.size()};
    }

    // 检查
     if (!chunkFile || !metaFile) {
       std::stringstream buf;
       buf << "写入第 " << chunkCount << " 分块时出错!";
       throw std::runtime_error(buf.str());
    }
  }

   return chunkCount;
}

// 步骤 2:查找重复行
void step2_FindDupLines(int chunkCount) {

   vector<ofstream> chunkDups;
   vector<MappedFile<>> chunkText;
   vector<MappedFile<Meta>> chunkMeta;
   std::priority_queue<Line, vector<Line>, std::greater<>> lines;

  // 映射所有分块的文本、行数据文件,
  // 也准备好记录各分块重复行数据的文件
   for (int idx = 0; idx < chunkCount; idx++) {
     chunkText.emplace_back(tmpDir / (to_string(idx) + ".txt"));
     chunkMeta.emplace_back(tmpDir / (to_string(idx) + ".meta"));
     chunkDups.emplace_back(tmpDir / (to_string(idx) + ".dups"), ios::binary | ios::trunc);
     lines.push({idx});
  }

  // 利用小根堆,按(行内容,分块号,偏移量)顺序,流式多路归并
   string_view last{};
   while (!lines.empty()) {

    // 与上一行相同,则将偏移量写入,对应分块待删除行名单内
     auto line = lines.top(); lines.pop();
     if (last == line.str && !last.empty())
       chunkDups[line.chunkIdx].write((char*)&line.offset, sizeof line.offset);
     last = line.str;

    // 该分块行数据未遍历完,则继续将下一行添加进小根堆中
     auto& text = chunkText[line.chunkIdx];
     auto& meta = chunkMeta[line.chunkIdx];
     if (meta < meta.end()) {
       lines.push({line.chunkIdx, (*meta).offset, {text, (*meta).length}});
       text += (*meta).length;
       meta++;
    }
  }

  // 检查
   for (auto&& file: chunkDups) {
     if (!file) {
       std::stringstream buf;
       buf << "保存第 " << chunkCount << " 分块删除名单时出错!";
       throw std::runtime_error(buf.str());
    }
  }
}

// 步骤 3:合并分块
void step3_MergeChunks(int chunkCount, const fs::path& outFile) {

   ofstream textOut {outFile, ios::binary | ios::trunc};
   if (!textOut) throw std::runtime_error("无法打开输出文件");

   for (int idx = 0; idx < chunkCount; idx++) {

    // 映射分块(排序后)文本、行数据、删除名单
     MappedFile<> text {tmpDir / (to_string(idx) + ".txt")};
     MappedFile<Meta> meta {tmpDir / (to_string(idx) + ".meta")};
     MappedFile<decltype(Meta::offset)> dups {tmpDir / (to_string(idx) + ".dups")};

    // 剔除删除名单中的行
     vector<Line> lines; lines.reserve(meta.size);
     for (; meta < meta.end(); text += (*meta++).length) {
       if (dups < dups.end() && *dups == (*meta).offset)
         dups++;
       else
         lines.push_back({idx, (*meta).offset, {text, (*meta).length}});
    }

    // 再按偏移量顺序排序好
     std::sort(lines.begin(), lines.end(), [](auto&& a, auto&& b) {
       return a.offset < b.offset;
    });

    // 逐行输出
     for (auto&& line: lines)
       textOut << line.str << '\n';
  }

  // 检查
   if (!textOut)
     throw std::runtime_error("写入输出文件时出错!");
}

int main(int argc, const char* argv[]) {

   if (argc < 3) {
     std::stringstream buf;
     buf << "大文本去重并保持顺序工具\n\n"
      << "用法:" << argv[0] << " 输入文件 输出文件 "
      << "[内存限制 MB = " << (max_memory >> 20) << "] "
      << "[线程限制 = " << max_thread << "]";
     std::cerr << buf.str() << std::endl;
     return -1;
  }

   auto inFile = argv[1];
   auto outFile = argv[2];
   if (argc > 3) max_memory = (std::max)(std::stoull(argv[3]), 1ull) << 20ull;
   if (argc > 4) max_thread = (std::max)((std::min)(std::stoi(argv[4]), 256), 1);

   auto chunkCount = step1_SplitChunks(inFile);
   step2_FindDupLines(chunkCount);
   step3_MergeChunks(chunkCount, outFile);

  // 清空临时文件
   for (int i = 0; i < chunkCount; i++)
     for (auto&& suffix: {".txt", ".meta", ".dups"})
       fs::remove(tmpDir / (to_string(i) + suffix));
}
```
@phrack #15

突然想起来,zram 可能行不通。。

sha1 的结果,是不是相当于随机数据?随机数据,压缩不了啥吧。。

即,256 GB 内存,当不了 512 GB 用?全都用来存压缩比 100% 后的数据了?


@cabbage #57

需要保留 27 行原始 string ,在小根堆里。

第二步目的:检查出各块中,偏移量非最小的重复行,记录进删除名单中。

第二步时,已经有 26.5 个 240GB 的、排序好的块。

参考多路归并,可以流式构造出有序的 (string, offset, chunk_index)。

当 string 与上一个不同时,说明碰到偏移量最小的新行了(即,全文首次出现)。

当 string 与上一个相同时,说明重复行了,此时往 "to_del_${chunk_index}.txt" 里记录 offset 。

(可以攒多点再写,反正只用了 27 个字符串 + 27 * 1GB 缓冲区,还剩 200+GB 内存呢。。)

以前写过类似的,10+ MB 内存去重 13 GB 文件,里面也有用到多路归并:/t/1031275#reply15
36 37 楼,好像没 @ 成功。。再试一下。。


@opengps #3
@msg7086 #32

如果数据库,每秒写入 10W 条,总计要 203e8 / 1e5 / 3600 = 56 小时?


@hbcolorful #17
@NotLongNil #30

用布隆过滤,几十 GB 好像不够。
在线算了下,50 GiB + 15 函数,都会有 1 / 26000 概率出错,大约丢失 80W 行数据?
250 GiB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,不丢失任何数据,也不保留任何重复行?


https://i.imgur.com/F29pmQ6.png https://i.imgur.com/F29pmQ6.png
@dingwen07 #38 是的。37 楼有提及到,用多少哈希函数。


@dode #42

《写入到对应分区下面》这个是缓存尽可能多的文本(如 1GB ),再写入,是吗?
《检索特定行文本,是否在对应分区内存在》这个是如何做到,顺序读的呢?


@tonywangcn #44

平均每十亿条,就误认为一次,某行是重复行,导致丢失该行?
那你要问 @drymonfidelia 愿不愿意丢失几十行数据了。。
@cndenis #14 ,@hbcolorful #17 ,@NotLongNil #30:

用布隆过滤,几十 GB 好像不够。
在线算了下,50 GB + 15 函数,都会有 1 / 25000 概率出错。。
250 GB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,一个不出错?


@phrack #15:

压缩内存,来存 hash ?好像真的可行。。
平均而言,写入 (372 << 30) / 4096 = 1 亿次,就会占满 372 GB 内存页。即,几乎一开始就会启用 zram ?
我在别处看了看,lz4 每秒能有 200W 次 IO ,算下来要 2.8 小时即可?
话说,这个和 Bloom Filter 相比,哪个出错概率小呢?


https://i.imgur.com/F29pmQ6.png https://i.imgur.com/F29pmQ6.png
@dcsuibian #2 ,@opengps #3 ,@msg7086 #32:
如果数据库,每秒写入 10W 条,总计要 203e8 / 1e5 / 3600 = 56 小时?

@YTMartian #26 ,@dode #27:
就算固态 4K 随机读写有 10W IOPS ,算下来也要 56 小时吧?

https://i.imgur.com/F29pmQ6.png https://i.imgur.com/F29pmQ6.png
34 楼纠正下数据,实测轻薄本 i5-8250U ,1.5 秒归并排序 320W 个 336 长度的随机字符串,约 6500W 次比较。

- 多线程快排/归并:总计 6017 亿次比较,我的轻薄本 8 线程需 0.5 小时。
- 单线程小根堆:总计 1910 亿次比较,单线程需 1.2 小时。

差不太远。。
想到个方法,预计耗时:10 小时,准确率:100% 剔除重复行。


## 简单流程

1. 分块排序。
2. 同时循环每块,删掉非首次出现的重复行。
3. 分别循环每块,按行号顺序,输出未被删掉的行。


## 详细流程

1. 分块 240GB 文件,每块排序后,写入固态。同时保存每行长度+原始偏移量(约 (240 << 30) / 335 * 8 / 1024 ^ 3 = 5.7 GB )。
2. 利用小根堆,流式排序(按 <string, offset> 排)所有分块每一行。非首次出现行,保存该行偏移量,到相应块的删除名单里。
3. 循环每块,排序原始偏移量、删除名单,按序输出(未被删除的)相应行,至最终文件。


## 耗时计算

1. 顺序读写:9 小时( 3 次顺序读,2 次顺序写,假设都为 1GB/s )。
2. 内存字符串排序:< 1 小时(实测轻薄本 i5-8250U ,每秒归并排序 200W 次 335 长度的随机字符串,约 6900W 次比较)。
- 多线程快排/归并:`(每块行数 = (240 << 30) / 335) * log2(每块行数) * 块数 = 6017 亿` 次比较,我的轻薄本 8 线程需 0.3 小时。
- 单线程小根堆:`202e8 * log2(块数 = 6.2 * 1024 / 240 = 26.5) * 2 = 1910 亿` 次比较,需 0.7 小时。
17 天前
回复了 tramm 创建的主题 数据库 有没有推荐的时序数据库或者其他数据库?
需求是这样吗:每秒最多写入 (100W 在线设备 / 5s/条) = 20W 条数据,每条 (4 << 40) / 6e10 = 73 字节?

没用过太高大上的数据库,只有点朴素的想法。

1. 每小时的数据,存在内存里(需 20W * 3600 * 100 / 2 ^ 30 = 67 GB 内存)
2. 数据库 ID 设计为(时间戳_年~小时 + 设备 ID + 时间戳_分~秒)
3. 整点后,按 (设备 ID, 时间) 顺序,写入这一小时数据,至数据库。(顺序写入,很快的)
4. 如果怕丢失数据,接收到上报信息后,也同步写入到 csv 文件里。
 (也是顺序写入,很快的。如果只是《时间戳,设备 ID 》信息,每秒顺序写入 (10 + 1 + 6 + 1) * 20W / 2 ^ 20 = 3.43 MB 数据而已)

这样搞下来,数据库相当于《读多写少》了。正好可以考虑 SQLite 、DuckDB 等。。

试了下,对于 SQLite ,如果只是存上述主键的话,一小时 7.2 亿数据,需要 10GB 文件,写了 5 分钟。。(平均每秒 200W ,还行)

前几天回帖过,SQLite 可以 0.1 秒,在 1 亿数据里,找什么设备,什么时候离线超过 24 小时(每台设备每分钟上报一次)。

当然,这是缓存了 1 GB 内存的结果。在固态上,完全无缓存,需要几秒钟。

具体要啥查询,题主也没写。。只能联想到前几天,有点相关的场景了。。

如果换 DuckDB ,应该能更快。我没试过。
@wanghr64 #5 老板:帮我统计该员工认真工作时长?

https://i.imgur.com/krir4IG.png https://i.imgur.com/krir4IG.png
18 天前
回复了 irihiyahnj 创建的主题 信息安全 用了 23 年的 QQ 被盗了
@testonly #5 听起来有点像网络世界的房产税?

每年贡献点钱保号?不想交,就会被赶出去?价值越高的号,概率越高?

狠一点,就像毛伊岛一样,强抢了事?

https://i.imgur.com/F29pmQ6.png https://i.imgur.com/F29pmQ6.png
@Archeb #2 以前想让 OpenAI 出海龟汤题给我,它一直不理解是啥。。

我就举个例子(好像涉及杀人啥的),然后就被封号了。。

https://i.imgur.com/krir4IG.png https://i.imgur.com/krir4IG.png
20 天前
回复了 Zaden 创建的主题 MySQL mysql 如何高效获取两条相邻推送时间间隔
@ys1992 #10

我又改了改,速度提升到,仅 0.2 秒了。。

原来大部分时间,都在遍历这张一亿数据的表,查有哪些独立的 point_id 了。。

如果有 point 表,直接从里面抽出所有 point_id 即可。我是手动用 generate_series(1, 100) 模拟的。

https://i.imgur.com/krir4IG.png https://i.imgur.com/krir4IG.png


@Zaden #22:

可能你和我原因一样,想分组 point_id ,查最后时间,数据库居然扫全索引去了。。

其他快的原因,我觉得就是,不断跳来跳去,直接查最接近 24 小时前的时间,而不是每两条时间一一对比。

这需要高度依赖 4K 随机读取。比如,浏览器里运行数据库时,会将数据留存在内存里,自然快一些。

- 本地上测试,同样缓存在内存里时,只需 0.1 秒
- 7 年前垃圾固态上(顺序读 420 MB/s ,4K 随机读 25 MB/s ),且清除系统对文件缓存后,需 10 秒
- 10 年前硬盘上(顺序读 150 MB/s ,4K 随机读 0.65 MB/s ),也清除系统对文件缓存后,需 80 秒。。

我也不知道,为啥硬盘 4K 随机读,比固态差近 40 倍,但耗时才慢 8 倍。。

可能夹杂着一些顺序读取(比如有时跳到相邻页上了?),使得差距没这么大吧。。

总之,我浏览器里都能缓存一亿数据(约 1.3 GB ),对你来说应该也不是啥难事的。


## 0.2 秒截图

https://i.imgur.com/0iDVpvs.png
20 天前
回复了 Zaden 创建的主题 MySQL mysql 如何高效获取两条相邻推送时间间隔
@ys1992 #10 不一定要扫全表吧。。

不断根据索引,查最接近 24 小时前的推送时间,应该只需要检查很少数据量,就能算出来了?


@Zaden 我用最垃圾的 SQLite ,在七年前的 i5-8250U 轻薄本上,效率一般的浏览器 wasm 环境里,试了下,

100 设备、一亿数据(每分钟推送、持续两年),每设备断线十次,每次 1~2 天,

只需 7 秒,就能全找出来了?


## 截图

https://i.imgur.com/lZyiLTp.png


## SQL 测试代码

```sql
-- V 站吞空格,缩进改成全角空格了

-- 建表,当 (point_id, push_time) 索引用
DROP TABLE IF EXISTS data;
CREATE TABLE data (
   point_id INT,
   push_time INT,
   PRIMARY KEY (point_id, push_time)
) WITHOUT ROWID;

-- 添加一亿条数据( 100 设备、每分钟推送、持续两年)
INSERT INTO data
SELECT point.value, time.value
FROM generate_series(1, 100) point
JOIN generate_series(
   unixepoch('2024-05-24', '-2 year'),
   unixepoch('2024-05-24'), 60) time;

-- 删掉断线数据( 100 设备,每台断线 10 次,第 N 次是 id*N 天前,持续 1, 1.1, ..., 1.9 天)
DELETE FROM data
WHERE (point_id, push_time) IN (
   SELECT point_id, push_time
   FROM generate_series(1, 100) point
   JOIN generate_series(1, 10) nth
   CROSS JOIN data
   WHERE point_id = point.value
   AND push_time >= unixepoch('2024-05-24', format('-%f day', nth.value * (point.value + 0.1) - 0.1))
   AND push_time < unixepoch('2024-05-24', format('-%f day', nth.value * point.value - 1))
);

-- 循环每个设备,从今天开始,不断往前找,最接近 24 小时前的推送时间
-- 若俩时间 >= 24 小时,则属于断线过久
WITH RECURSIVE
   t(id, a, b) AS (
     SELECT point_id, unixepoch('2024-05-24'), NULL
     FROM data
     GROUP BY point_id
     UNION ALL
     SELECT id, ifnull((
         SELECT min(push_time)
         FROM data
         WHERE point_id = t.id
         AND push_time > t.a - 86400
         AND push_time < t.a
      ), (
         SELECT max(push_time)
         FROM data
         WHERE point_id = t.id
         AND push_time < t.a - 86400
      )), a
     FROM t
     WHERE a
  )
SELECT
   id "设备 ID",
   datetime(a, 'auto') "最后在线",
   format('%d 天 %d 小时 %d 分钟', (b-a)/86400, (b-a)%86400/3600, (b-a)%3600/60) "断线时长"
FROM t
WHERE b - a >= 86400
ORDER BY id IN (1, 2, 73) DESC, id, a DESC;
```
22 天前
回复了 freewind 创建的主题 数据库 请教多标签查询怎么做效率高?
@yjhatfdu2 #19 我用 SQLite 试了下,感觉用普通 SQL ,也勉强能满足这个需求。

## 结果

- 数据:4000 标签,1 亿数据,50 标签/数据(即,倒排索引中,约 50 亿数据 ID )
- 性能:0.8s 查询一个标签,拥有的 625W 数据 ID ,升序输出
- 体积:倒排索引表,约 11 GB

## 环境

- SYS:Win11
- CPU:i7-4790
- MEM:16 GB ,1600 MHz
- HDD:顺序读 150 MB/s ,4K 随机读 0.65 MB/s

## 多标签呢?

SQLite 不支持 Sort Merge Join ,多个标签一起查时,很慢。。
需要物化每个标签的结果,再由 SQLite 进行布隆匹配、B 树二分查找。。

(测前,都用 RamMap 清除了,系统对文件的缓存)
2 个 625W 数据 ID 的标签,结果 250W ,耗时 4.9 秒,
3 个 625W 数据 ID 的标签,结果 205W ,耗时 8.5 秒,
4 个 625W 数据 ID 的标签,结果 120W ,耗时 11.9 秒。。

不知道能不能用 Virtual Table ,写个表值函数(知道有,没写过),自己实现:

- Sort Merge Join 的效果,流式匹配所有标签,免除物化表、上千万次 B 树匹配的开销
- 提速读取数据 ID ( SQL 中,若写为 `SELECT COUNT(*)`,不计算具体数据 ID ,会提速到 0.1s )

## 倒排索引结构

- 表结构:`(<标签 ID ,高位数据 ID >,低 11 位数据 ID 集合 BLOB )`
- 若 > 146 个低位数据 ID ,第 3 列视为 293 字节的 Bitmap ,但每字节最高位弃用( SQLite 中没有便捷转换 128~255 为单字节 BLOB 的方法)
- 否则,每个低位数据 ID ,转为二字节 UTF-8 字符串( 0~127 需后补 `x'00'`),并串联起来。

## 测试数据

- 标签 ID:`[0, 3999]`
- 数据 ID:`[1, 1e8]`
- 每个数据拥有的标签 ID:`[(数据 ID * 1) % 4000, (数据 ID * 2) % 4000, ..., (数据 ID * 50) % 4000]`
- 标签 ID 为 400 、1600 、2800 、3600 时,拥有的数据 ID 最多,都为 625W 个

## 查询 SQL

```sql
-- V 站吞空格,缩进改为全角空格了
WITH
   t1 AS MATERIALIZED (
     SELECT (data_hi_id << 11) |
       CASE WHEN i.stop = 7 THEN -- 低位数据 ID 量 <= 146 ,逐 2 字节翻译
         IFNULL(unicode(substr(data_lo_ids, j.value, 2)), 0)
       WHEN j.stop & (1 << (6 + j.value)) THEN -- Bitmap ,逐 bit 翻译
         i.value + j.value - 1
       END AS data_id
     FROM tag_data
     JOIN generate_series(7, IIF(length(data_lo_ids) < 293, 1, 293) * 7, 7) i
     JOIN generate_series(
       IIF(i.stop > 7, -6, 1),
       IIF(i.stop > 7, unicode(substr(data_lo_ids, i.value / 7, 1)), length(data_lo_ids)),
       IIF(i.stop > 7, 1, 2)) j
     WHERE tag_id = '《第一个标签 ID 》'
    -- 这句加了好慢。。怀疑是反复计算 data_id 导致
    -- AND data_id IS NOT NULL
  ),
  -- 以下结构类似,省略了
   t2 AS (...),
   t3 AS (...),
   t4 AS (...)
-- 写成 COUNT(*) 且仅 t1 时,飞快
-- 估计是不用算具体 data_id 了
SELECT COUNT(data_id)
FROM t1
JOIN t2 USING(data_id)
JOIN t3 USING(data_id)
JOIN t4 USING(data_id)
-- 匹配到后面,预计数据量很少时,可采用这种方式:
/*
WHERE EXISTS (
   SELECT 1
   FROM tag_data
   WHERE tag_id = '《第四个标签 ID 》'
   AND data_hi_id = data_id >> 11
   AND CASE WHEN octet_length(data_lo_ids) <= 292 THEN
       instr(data_lo_ids, IIF(data_id & 0x780, char(data_id & 0x7FF), char(data_id & 0x7FF, 0)))
     ELSE
       unicode(substr(data_lo_ids, (data_id & 0x7FF) / 7 + 1, 1)) & (1 << (data_id & 0x7FF) % 7)
     END)
*/
```

## 增删改

用触发器实现。但很慢。。

C++ 新人,正好练练手,生成倒排索引库(这都要花十几分钟。。):

```c++
// 同上,缩进是全角空格
#include <string>
#include <vector>
#include <iostream>
#include <string_view>
#include "sqlite_modern_cpp.h"

constexpr size_t MAX_DATA_ID = 1e8;
constexpr size_t MAX_TAG_COUNT = 50;
constexpr size_t MAX_TAG_ID = 4000 - 1;
constexpr size_t MAX_MEMORY = 8ull << 30;
constexpr auto DB_PATH = "D:/out.db";

int main() {

   struct TagData {
     struct {
       uint8_t data_lo_ids[(2048 + 6) / 7]{};
    } data_hi_ids[(MAX_DATA_ID >> 11) + 1]{};
  };

   constexpr int tag_steps = MAX_MEMORY / sizeof(TagData);

   sqlite::database db(DB_PATH);
   db << "CREATE TABLE IF NOT EXISTS tag_data ("
     "   tag_id INT,"
     "   data_hi_id INT,"
     "   count INT,"
     "   data_lo_ids BLOB,"
     "   PRIMARY KEY (tag_id, data_hi_id)"
     ") WITHOUT ROWID";

   std::string blob_buf;
   std::vector<uint16_t> data_lo_ids;
   auto insert_stmt = db << "INSERT INTO tag_data VALUES (?, ?, ?, CAST(? AS BLOB))";

   for (size_t tag_id_beg = 0; tag_id_beg <= MAX_TAG_ID; tag_id_beg += tag_steps) {

     auto tag_id_end = (std::min)(tag_id_beg + tag_steps - 1, MAX_TAG_ID);
     auto tags = std::make_unique<TagData[]>(tag_id_end - tag_id_beg + 1);

     db << "BEGIN";
     std::cout
      << "正在计算 1~" << MAX_DATA_ID << ",每个数的 1~" << MAX_TAG_COUNT << " 倍,÷" << MAX_TAG_ID + 1
      << " 后的余数,是否在 [" << tag_id_beg << ", " << tag_id_end << "] 范围内……" << std::endl;

     for (size_t data_id = 1; data_id <= MAX_DATA_ID; data_id++) {
       for (size_t times = 1; times <= MAX_TAG_COUNT; times++) {
         size_t tag_id = (data_id * times) % (MAX_TAG_ID + 1);
         if (tag_id >= tag_id_beg && tag_id <= tag_id_end)
           tags[tag_id - tag_id_beg]
            .data_hi_ids[data_id >> 11]
            .data_lo_ids[(data_id & 0x7FF) / 7] |= 1 << ((data_id & 0x7FF) % 7);
      }
    }

     for (size_t tag_id = tag_id_beg; tag_id <= tag_id_end; tag_id++) {

       auto& tag = tags[tag_id - tag_id_beg];
       std::cout << "正在写入 tag_id = " << tag_id << " 至数据库……\r";

       for (auto& data_hi: tag.data_hi_ids) {
         data_lo_ids.clear();
         for (size_t i = 0; i < sizeof data_hi.data_lo_ids; i++)
           if (data_hi.data_lo_ids[i])
             for (size_t j = 0; j < 7; j++)
               if (data_hi.data_lo_ids[i] & (1 << j))
                 data_lo_ids.push_back(i * 7 + j);

         if (data_lo_ids.size() > 292 / 2)
           insert_stmt
            << tag_id << (&data_hi - tag.data_hi_ids) << data_lo_ids.size()
            << std::string_view((char *) data_hi.data_lo_ids, sizeof data_hi.data_lo_ids)
            >> []{};

         else if (!data_lo_ids.empty()) {
           blob_buf.clear();
           for (auto lo_id: data_lo_ids)
             if (lo_id > 127)
               blob_buf.append({char(0xC0 | (lo_id >> 6)), char(0x80 | (lo_id & 0x3F))});
             else
               blob_buf.append({char(lo_id), 0});
           insert_stmt
            << tag_id << (&data_hi - tag.data_hi_ids) << data_lo_ids.size() << blob_buf
            >> []{};
        }
      }
    }

     db << "COMMIT";
  }
}
```
1  2  3  4  5  6  7  8  9  10 ... 28  
关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   4678 人在线   最高记录 6679   ·     Select Language
创意工作者们的社区
World is powered by solitude
VERSION: 3.9.8.5 · 28ms · UTC 03:55 · PVG 11:55 · LAX 20:55 · JFK 23:55
Developed with CodeLauncher
♥ Do have faith in what you're doing.