您好,登錄后才能下訂單哦!
這篇文章主要介紹了python如何使用100多行代碼寫一個(gè)數(shù)據(jù)庫(kù),具有一定借鑒價(jià)值,需要的朋友可以參考下。希望大家閱讀完這篇文章后大有收獲。下面讓小編帶著大家一起了解一下。
數(shù)據(jù)庫(kù)的名字叫WawaDB,是用python實(shí)現(xiàn)的。由此可見python是灰常強(qiáng)大啊!
簡(jiǎn)介
記錄日志的需求一般是這樣的:
只追加,不修改,寫入按時(shí)間順序?qū)懭耄?/p>
大量寫,少量讀,查詢一般查詢一個(gè)時(shí)間段的數(shù)據(jù);
MongoDB的固定集合很好的滿足了這個(gè)需求,但是MongoDB占內(nèi)存比較大,有點(diǎn)兒火穿蚊子,小題大做的感覺。
WawaDB的思路是每寫入1000條日志,在一個(gè)索引文件里記錄下當(dāng)前的時(shí)間和日志文件的偏移量。
然后按時(shí)間詢?nèi)罩緯r(shí),先把索引加載到內(nèi)存中,用二分法查出時(shí)間點(diǎn)的偏移量,再打開日志文件seek到指定位置,這樣就能很快定位用戶需要的數(shù)據(jù)并讀取,而不需要遍歷整個(gè)日志文件。
性能
Core 2 P8400,2.26GHZ,2G內(nèi)存,32 bit win7
寫入測(cè)試:
模擬1分鐘寫入10000條數(shù)據(jù),共寫入5個(gè)小時(shí)的數(shù)據(jù), 插入300萬條數(shù)據(jù),每條數(shù)據(jù)54個(gè)字符,用時(shí)2分51秒
讀取測(cè)試:讀取指定時(shí)間段內(nèi)包含某個(gè)子串的日志
數(shù)據(jù)范圍 遍歷數(shù)據(jù)量 結(jié)果數(shù) 用時(shí)(秒)
5小時(shí) 300萬 604 6.6
2小時(shí) 120萬 225 2.7
1小時(shí) 60萬 96 1.3
30分鐘 30萬 44 0.6
索引
只對(duì)日志記錄的時(shí)間做索引, 簡(jiǎn)介里大概說了下索引的實(shí)現(xiàn),二分查找肯定沒B Tree效率高,但一般情況下也差不了一個(gè)數(shù)量級(jí),而且實(shí)現(xiàn)特別簡(jiǎn)單。
因?yàn)槭窍∈杷饕?,并不是每條日志都有索引記錄它的偏移量,所以讀取數(shù)據(jù)時(shí)要往前多讀一些數(shù)據(jù),防止漏讀,等讀到真正所需的數(shù)據(jù)時(shí)再真正給用戶返回?cái)?shù)據(jù)。
如下圖,比如用戶要讀取25到43的日志,用二分法找25,找到的是30所在的點(diǎn),
索引:0 10 20 30 40 50 日志:|.........|.........|.........|.........|.........|>>>a = [0, 10, 20, 30, 40, 50]>>>bisect.bisect_left(a, 35)>>>3>>>a[3]>>>30>>>bisect.bisect_left(a, 43)>>>5>>>a[5]>>>50
所以我們要往前倒一些,從20(30的前一個(gè)刻度)開始讀取日志,21,22,23,24讀取后因?yàn)楸?5小,所以扔掉, 讀到25,26,27,...后返回給用戶
讀取到40(50的前一個(gè)刻度)后就要判斷當(dāng)前數(shù)據(jù)是否大于43了,如果大于43(返回全開區(qū)間的數(shù)據(jù)),就要停止讀了。
整體下來我們只操作了大文件的很少一部分就得到了用戶想要的數(shù)據(jù)。
緩沖區(qū)
為了減少寫入日志時(shí)大量的磁盤寫,索引在append日志時(shí),把buffer設(shè)置成了10k,系統(tǒng)默認(rèn)應(yīng)該是4k。
同理,為了提高讀取日志的效率,讀取的buffer也設(shè)置了10k,也需要根據(jù)你日志的大小做適當(dāng)調(diào)整。
索引的讀寫設(shè)置成了行buffer,每滿一行都要flush到磁盤上,防止讀到不完整的索引行(其實(shí)實(shí)踐證明,設(shè)置了行buffer,還是能讀到半拉的行)。
查詢
啥?要支持SQL,別鬧了,100行代碼怎么支持SQL呀。
現(xiàn)在查詢是直接傳入一個(gè)lambada表達(dá)式,系統(tǒng)遍歷指定時(shí)間范圍內(nèi)的數(shù)據(jù)行時(shí),滿足用戶的lambada條件才會(huì)返回給用戶。
當(dāng)然這樣會(huì)多讀取很多用戶不需要的數(shù)據(jù),而且每行都要進(jìn)行l(wèi)ambda表達(dá)式的運(yùn)算,不過沒辦法,簡(jiǎn)單就是美呀。
以前我是把一個(gè)需要查詢的條件和日志時(shí)間,日志文件偏移量都記錄在索引里,這樣從索引里查找出符合條件的偏移量,然后每條數(shù)據(jù)都如日志文件里seek一次,read一次。這樣好處只有一個(gè),就是讀取的數(shù)據(jù)量少了,但缺點(diǎn)有兩個(gè):
索引文件特別大,不方便加載到內(nèi)存中
每次讀取都要先seek,貌似緩沖區(qū)用不上,特別慢,比連續(xù)讀一個(gè)段的數(shù)據(jù),并用lambda過濾慢四五倍
寫入
前面說過了,只append,不修改數(shù)據(jù),而且每行日志最前面是時(shí)間戳。
多線程
查詢數(shù)據(jù),可以多線程同時(shí)查詢,每次查詢都會(huì)打開一個(gè)新的日志文件的描述符,所以并行的多個(gè)讀取不會(huì)打架。
寫入的話,雖然只是append操作,但不確認(rèn)多線程對(duì)文件進(jìn)行append操作是否安全,所以建議用一個(gè)隊(duì)列,一個(gè)專用線程進(jìn)行寫入。
鎖
沒有任何鎖。
排序
默認(rèn)查詢出來的數(shù)據(jù)是按時(shí)間正序排列,如需其它排序,可取到內(nèi)存后用python的sorted函數(shù)排序,想怎么排就怎么排。
# -*- coding:utf-8 -*- import os import time import bisect import itertools from datetimeimport datetime import logging default_data_dir= './data/' default_write_buffer_size= 1024*10 default_read_buffer_size= 1024*10 default_index_interval= 1000 def ensure_data_dir(): if not os.path.exists(default_data_dir): os.makedirs(default_data_dir) def init(): ensure_data_dir() class WawaIndex: def __init__(self, index_name): self.fp_index= open(os.path.join(default_data_dir, index_name+ '.index'),'a+',1) self.indexes,self.offsets,self.index_count= [], [],0 self.__load_index() def __update_index(self, key, offset): self.indexes.append(key) self.offsets.append(offset) def __load_index(self): self.fp_index.seek(0) for linein self.fp_index: try: key, offset = line.split() self.__update_index(key, offset) except ValueError:# 索引如果沒有flush的話,可能讀到有半行的數(shù)據(jù) pass def append_index(self, key, offset): self.index_count+= 1 if self.index_count% default_index_interval== 0: self.__update_index(key, offset) self.fp_index.write('%s %s %s' % (key, offset, os.linesep)) def get_offsets(self, begin_key, end_key): left= bisect.bisect_left(self.indexes,str(begin_key)) right= bisect.bisect_left(self.indexes,str(end_key)) left, right= left- 1, right- 1 if left <0: left= 0 if right <0: right= 0 if right >len(self.indexes)- 1: right= len(self.indexes)- 1 logging.debug('get_index_range:%s %s %s %s %s %s',self.indexes[0],self.indexes[-1], begin_key, end_key, left, right) return self.offsets[left],self.offsets[right] class WawaDB: def __init__(self, db_name): self.db_name= db_name self.fp_data_for_append= open(os.path.join(default_data_dir, db_name+ '.db'),'a', default_write_buffer_size) self.index= WawaIndex(db_name) def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset): fp_data= open(os.path.join(default_data_dir,self.db_name+ '.db'),'r', default_read_buffer_size) fp_data.seek(int(begin_offset)) line= fp_data.readline() find_real_begin_offset= False will_read_len, read_len= int(end_offset)- int(begin_offset),0 while line: read_len+= len(line) if (not find_real_begin_offset)and (line <str(begin_key)): line= fp_data.readline() continue find_real_begin_offset= True if (read_len >= will_read_len)and (line >str(end_key)):break yield line.rstrip('\r\n') line= fp_data.readline() def append_data(self, data, record_time=datetime.now()): def check_args(): if not data: raise ValueError('data is null') if not isinstance(data,basestring): raise ValueError('data is not string') if data.find('\r') != -1 or data.find('\n') != -1: raise ValueError('data contains linesep') check_args() record_time= time.mktime(record_time.timetuple()) data= '%s %s %s' % (record_time, data, os.linesep) offset= self.fp_data_for_append.tell() self.fp_data_for_append.write(data) self.index.append_index(record_time, offset) def get_data(self, begin_time, end_time, data_filter=None): def check_args(): if not (isinstance(begin_time, datetime)and isinstance(end_time, datetime)): raise ValueError('begin_time or end_time is not datetime') check_args() begin_time, end_time= time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple()) begin_offset, end_offset= self.index.get_offsets(begin_time, end_time) for datain self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset): if data_filter: if data_filter(data): yield data else: yield data def test(): from datetimeimport datetime, timedelta import uuid, random logging.getLogger().setLevel(logging.NOTSET) def time_test(test_name): def inner(f): def inner2(*args,**kargs): start_time= datetime.now() result= f(*args,**kargs) print '%s take time:%s' % (test_name, (datetime.now()- start_time)) return result return inner2 return inner @time_test('gen_test_data') def gen_test_data(db): now= datetime.now() begin_time= now- timedelta(hours=5) while begin_time < now: print begin_time for iin range(10000): db.append_data(str(random.randint(1,10000))+ ' ' +str(uuid.uuid1()), begin_time) begin_time+= timedelta(minutes=1) @time_test('test_get_data') def test_get_data(db): begin_time= datetime.now()- timedelta(hours=3) end_time= begin_time+ timedelta(minutes=120) results= list(db.get_data(begin_time, end_time,lambda x: x.find('1024') != -1)) print 'test_get_data get %s results' % len(results) @time_test('get_db') def get_db(): return WawaDB('test') if not os.path.exists('./data/test.db'): db= get_db() gen_test_data(db) #db.index.fp_index.flush() db= get_db() test_get_data(db) init() if __name__== '__main__': test()
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享python如何使用100多行代碼寫一個(gè)數(shù)據(jù)庫(kù)內(nèi)容對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,遇到問題就找億速云,詳細(xì)的解決方法等著你來學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。