溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

elasticsearch引擎怎么啟動

發(fā)布時間:2021-12-16 10:11:35 來源:億速云 閱讀:153 作者:iii 欄目:云計算

這篇文章主要講解了“elasticsearch引擎怎么啟動”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“elasticsearch引擎怎么啟動”吧!

        Engine是ES最接近神Lucene的地方,是對Lucene分布式環(huán)境訪問的一層封裝。這個類的接口,是個命令模式,所以很自然的實現(xiàn)了操作日志 translog。引擎舊版本的實現(xiàn)類叫RobinEngine,新版本改名了,而且加了幾個版本類型。不過這對我們分析源碼影響不大。它主要的2個內(nèi)容是 操作日志數(shù)據(jù)版本。亮點是鎖的運用。我們這里不羅列代碼了,就從 封裝并發(fā)  2個角度看下代碼。

elasticsearch引擎怎么啟動

啟動引擎

        既然是引擎,開始討論之前,先啟動了它再說。ES是用guice管理的實例。我們?yōu)榱酥庇^一點,就直接New了。

public Engine createEngine() throws IOException {
		 	Index index=new Index("index");
		 	
		 	ShardId shardId = new ShardId(index, 1);
	    	
		 	ThreadPool threadPool = new ThreadPool();
		 	
		 	CodecService cs = new CodecService(shardId.index());
		 	
	        AnalysisService as = new AnalysisService(shardId.index());
	        
	        SimilarityService ss = new SimilarityService(shardId.index());
		 	
		 	Translog translog = new FsTranslog(shardId, EMPTY_SETTINGS, new File("c:/fs-translog"));
		 	
		 	DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
		 	
	        Store store =  new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService));
	       
	        SnapshotDeletionPolicy sdp = new SnapshotDeletionPolicy( new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS));
	        
	        MergeSchedulerProvider<?> scp = new SerialMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool);
	        
	        MergePolicyProvider<?> mpp = new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(index, EMPTY_SETTINGS));
	        
	        IndexSettingsService iss = new IndexSettingsService(shardId.index(), EMPTY_SETTINGS);
	        
	        ShardIndexingService sis = new ShardIndexingService(shardId, EMPTY_SETTINGS,new ShardSlowLogIndexingService(shardId,EMPTY_SETTINGS, iss));
	        
	        Engine engine =  new RobinEngine(shardId,EMPTY_SETTINGS,threadPool,iss,sis,null,store, sdp, translog,mpp, scp,as, ss,cs);
	        
	        return engine;
	}

對Lucene的封裝

        封裝后,其實就是用JSON語法查詢,返回JSON內(nèi)容。這個在引擎這個層面,還沒實現(xiàn)。回想下Lucene的CURD。我們再看看ES引擎的CURD是什么樣的。首先對Document封裝了下,叫ParsedDocument

private ParsedDocument createParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl,Analyzer analyzer, BytesReference source, boolean mappingsModified) {
	        Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE);
	        Field versionField = new NumericDocValuesField("_version", 0);
	        Document document=new Document();
	        document.add(uidField);
	        document.add(versionField);
	        document.add(new Field("_source",source.toBytes()));
	        document.add(new TextField("name", "myname", Field.Store.NO));
	        return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), analyzer, source, mappingsModified);
	}
Engine engine = createEngine();
engine.start();
String json="{\"name\":\"myname\"}";
BytesReference source = new BytesArray(json.getBytes());
ParsedDocument doc = createParsedDocument("2", "myid", "mytype", null, -1, -1, Lucene.STANDARD_ANALYZER, source, false);
//增加
Engine.Create create = new Engine.Create(null, newUid("2"), doc);
engine.create(create);
create.version(2);
create.versionType(VersionType.EXTERNAL);
engine.create(create);
//刪除
Engine.Delete delete = new Engine.Delete("mytype", "myid", newUid("2"));
engine.delete(delete);
//修改類似增加,省略了
//查詢
TopDocs result=null;
try {
    Query query=new MatchAllDocsQuery();
    result=engine.searcher().searcher().search(query,10);
    System.out.println(result.totalHits);
} catch (Exception e) {
    e.printStackTrace();
}
//獲取
Engine.GetResult gr = engine.get(new Engine.Get(true, newUid("1")));
System.out.println(gr.source().source.toUtf8());

從用法上來看這個小家伙,也沒那么厲害啊。沒關(guān)系,到后來他會越來越厲害,直到最后成長成高大上的ES。

query和Get的比較

query的流程我們是清楚的,那Lucene沒有的這個Get操作是什么樣的邏輯呢? 我們看下

最理想的情況下,直接從translog里,返回數(shù)據(jù)。否則會根據(jù)UID在TermsEnum里定位這個詞條(二分查找?),然后根據(jù)指針從fdt文件里,返回內(nèi)容。

就說如果把query分,query和fatch 2個階段的話,他是前面的階段不一樣的。比起termQuery少了一些步驟。

//我簡化了代碼,演示代碼
for(AtomicReaderContext context : reader.leaves()){
			try {
				Terms terms=context.reader().terms("brand");
				TermsEnum te= terms.iterator(null);
				BytesRef br=new BytesRef("陌".getBytes());
				
				if(te.seekExact(br,false)){
					DocsEnum docs = te.docs(null, null);
					for (int d = docs.nextDoc(); d != DocsEnum.NO_MORE_DOCS; d = docs.nextDoc()) {
						System.out.println(reader.document(d).getBinaryValue("_source").utf8ToString());
		            }
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
refresh和Flush的區(qū)別

refresh調(diào)用的就是Lucene的 searcherManager.maybeRefresh() ,F(xiàn)lush的話分,3種情況

static class Flush {

        public static enum Type {
            /**
             * 創(chuàng)建一個新的Writer
             */
            NEW_WRITER,
            /**
             * 提交writer
             */
            COMMIT,
            /**
             * 提交translog.
             */
            COMMIT_TRANSLOG
        }

refresh的話更輕量一點。他默認會自動刷新

@Override
    public TimeValue defaultRefreshInterval() {
        return new TimeValue(1, TimeUnit.SECONDS);
    }

并發(fā)控制

它每一個操作的邏輯都差不多,我們選一個創(chuàng)建看下。引擎是整個ES用鎖用的最頻繁的地方,一層套一層的用啊,不出事,也是怪事一件。

    @Override
    public void create(Create create) throws EngineException {
        rwl.readLock().lock();
        try {
            IndexWriter writer = this.indexWriter;
            if (writer == null) {
                throw new EngineClosedException(shardId, failedEngine);
            }
            innerCreate(create, writer);
            dirty = true;
            possibleMergeNeeded = true;
            flushNeeded = true;
        } catch (IOException e) {
            throw new CreateFailedEngineException(shardId, create, e);
        } catch (OutOfMemoryError e) {
            failEngine(e);
            throw new CreateFailedEngineException(shardId, create, e);
        } catch (IllegalStateException e) {
            if (e.getMessage().contains("OutOfMemoryError")) {
                failEngine(e);
            }
            throw new CreateFailedEngineException(shardId, create, e);
        } finally {
            rwl.readLock().unlock();
        }
    }

    private void innerCreate(Create create, IndexWriter writer) throws IOException {
        synchronized (dirtyLock(create.uid())) {
            //....省略下數(shù)據(jù)版本的驗證,不在這里講
            if (create.docs().size() > 1) {
                writer.addDocuments(create.docs(), create.analyzer());
            } else {
                writer.addDocument(create.docs().get(0), create.analyzer());
            }
            Translog.Location translogLocation = translog.add(new Translog.Create(create));

            //versionMap.put(versionKey, new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));

            indexingService.postCreateUnderLock(create);
        }
    }

首先是個讀寫鎖。很多操作都加讀鎖了,寫鎖只有啟動關(guān)閉引擎,recover-phase3 ,NEW_WRITER類型的flush的時候才加的。意思就是。。。在我啟動關(guān)閉引擎,數(shù)據(jù)恢復(fù)和重新創(chuàng)建indexWriter的時候,是不能干任何事情的。接下來是個對象鎖。UID加鎖,用的鎖分段技術(shù),就是ConcurrentHashMap的原理。減少了大量鎖對象的創(chuàng)建。要知道UID可是個海量對象啊。這里要是用個String,分分鐘就OO吧。

private final Object[] dirtyLocks;
this.dirtyLocks = new Object[indexConcurrency * 50]; // 默認最多會有8*50個鎖對象...
for (int i = 0; i < dirtyLocks.length; i++) {
        dirtyLocks[i] = new Object();
}
        
private Object dirtyLock(BytesRef uid) {
        int hash = DjbHashFunction.DJB_HASH(uid.bytes, uid.offset, uid.length);
        // abs returns Integer.MIN_VALUE, so we need to protect against it...
        if (hash == Integer.MIN_VALUE) {
            hash = 0;
        }
        return dirtyLocks[Math.abs(hash) % dirtyLocks.length];
}

感謝各位的閱讀,以上就是“elasticsearch引擎怎么啟動”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對elasticsearch引擎怎么啟動這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI