我想要一个唯一的cript
(键/值)数据库,可以从同时运行的多个Python脚本访问。
如果script1.py
更新d[2839]
,那么script2.py
应该在几秒钟后查询d[2839]
时看到修改后的值。
>
我想过使用SQLite,但是从多个进程并发写/读似乎不是SQLite的强项(假设script1.py
刚刚修改了d[2839]
,script2.py
的SQLite连接如何知道它必须重新加载数据库的这个特定部分?)
当我想刷新修改时,我也想过锁定文件(但这是相当棘手的),并使用json. dump
进行序列化,然后尝试检测修改,使用json.load
重新加载是否有任何修改等…哦,不,我正在重新发明轮子,并重新发明一个特别低效的键/值数据库!
redis看起来像一个解决方案,但它不正式支持Windows,这同样适用于eveldb。
多个脚本可能想要完全同时写入(即使这是一个非常罕见的事件),有没有办法让DB系统处理这个(由于锁定参数?似乎默认情况下SQLite不能这样做,因为“SQLite支持无限数量的同时读取器,但它在任何时刻都只允许一个写入器。”)
Pythonic的解决方案是什么?
注意:我在视窗上,而这个命令应该有最大1M项(键和值都是整数)。
除了SQLite之外,嵌入式数据存储的摩斯没有针对并发访问的优化,我也对SQLite的并发性能感到好奇,所以我做了一个基准测试:
import time
import sqlite3
import os
import random
import sys
import multiprocessing
class Store():
def __init__(self, filename='kv.db'):
self.conn = sqlite3.connect(filename, timeout=60)
self.conn.execute('pragma journal_mode=wal')
self.conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
self.conn.commit()
def get(self, key):
item = self.conn.execute('select value from "kv" where key=?', (key,))
if item:
return next(item)[0]
def set(self, key, value):
self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
self.conn.commit()
def worker(n):
d = [random.randint(0, 1<<31) for _ in range(n)]
s = Store()
for i in d:
s.set(i, i)
random.shuffle(d)
for i in d:
s.get(i)
def test(c):
n = 5000
start = time.time()
ps = []
for _ in range(c):
p = multiprocessing.Process(target=worker, args=(n,))
p.start()
ps.append(p)
while any(p.is_alive() for p in ps):
time.sleep(0.01)
cost = time.time() - start
print(f'{c:<10d}\t{cost:<7.2f}\t{n/cost:<20.2f}\t{n*c/cost:<14.2f}')
def main():
print(f'concurrency\ttime(s)\tpre process TPS(r/s)\ttotal TPS(r/s)')
for c in range(1, 9):
test(c)
if __name__ == '__main__':
main()
结果在我的4核macOS盒,SSD卷:
concurrency time(s) pre process TPS(r/s) total TPS(r/s)
1 0.65 7638.43 7638.43
2 1.30 3854.69 7709.38
3 1.83 2729.32 8187.97
4 2.43 2055.25 8221.01
5 3.07 1629.35 8146.74
6 3.87 1290.63 7743.78
7 4.80 1041.73 7292.13
8 5.37 931.27 7450.15
结果在8核心Windows服务器2012云服务器,SSD卷:
concurrency time(s) pre process TPS(r/s) total TPS(r/s)
1 4.12 1212.14 1212.14
2 7.87 634.93 1269.87
3 14.06 355.56 1066.69
4 15.84 315.59 1262.35
5 20.19 247.68 1238.41
6 24.52 203.96 1223.73
7 29.94 167.02 1169.12
8 34.98 142.92 1143.39
事实证明,无论并发如何,整体吞吐量都是一致的,并且SQLite在Windows上比macOS慢,希望这有帮助。
由于SQLite写锁是基于数据库的,为了获得更多的TPS,您可以将数据分区到多数据库文件中:
class MultiDBStore():
def __init__(self, buckets=5):
self.buckets = buckets
self.conns = []
for n in range(buckets):
conn = sqlite3.connect(f'kv_{n}.db', timeout=60)
conn.execute('pragma journal_mode=wal')
conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
conn.commit()
self.conns.append(conn)
def _get_conn(self, key):
assert isinstance(key, int)
return self.conns[key % self.buckets]
def get(self, key):
item = self._get_conn(key).execute('select value from "kv" where key=?', (key,))
if item:
return next(item)[0]
def set(self, key, value):
conn = self._get_conn(key)
conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
conn.commit()
我的mac上有20个分区的结果:
concurrency time(s) pre process TPS(r/s) total TPS(r/s)
1 2.07 4837.17 4837.17
2 2.51 3980.58 7961.17
3 3.28 3047.68 9143.03
4 4.02 2486.76 9947.04
5 4.44 2249.94 11249.71
6 4.76 2101.26 12607.58
7 5.25 1903.69 13325.82
8 5.71 1752.46 14019.70
总TPS高于单个数据库文件。
在redis之前,有Memcached(适用于windows)。这里有一个教程。https://realpython.com/blog/python/python-memcache-efficient-caching/
我会考虑两种选择,都是嵌入式数据库
正如这里和这里的回答,应该没问题
链接
BerkeleyDB(BDB)是一个旨在为键/值数据提供高性能嵌入式数据库的软件库
它的设计完全符合你的目的
BDB可以支持数以千计的控制线程或并发进程同时操作高达256 TB的数据库,3适用于各种操作系统,包括大多数类Unix和Windows系统,以及实时操作系统。
它很健壮,已经存在了几年甚至几十年
调用redis
/memcached
/任何其他需要sysops参与的成熟的基于套接字的服务器IMO是任务在位于同一盒子上的两个脚本之间交换数据的开销