溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Elasticsearch reindex及Java使用sliceScorll優(yōu)化查詢的方法

發(fā)布時間:2021-06-28 16:17:56 來源:億速云 閱讀:300 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要講解了“Elasticsearch reindex及Java使用sliceScorll優(yōu)化查詢的方法”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Elasticsearch reindex及Java使用sliceScorll優(yōu)化查詢的方法”吧!

Reindex會將一個索引的數(shù)據(jù)復制到另一個已存在的索引,但是并不會復制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。

簡單實例如下

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200", // 遠程es的ip和port列表
      "socket_timeout": "1m",
      "connect_timeout": "10s"  // 超時時間設置
    },
    "index": "my_index_name", // 源索引名稱
    "query": {         // 滿足條件的數(shù)據(jù)
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest_index_name"  // 目標索引名稱
  }
}

具體詳細的使用參考

ElasticSearch 6.3版本 Document APIs之Reindex API

elasticsearch 基礎 —— ReIndex

在java中對于reindexapi沒有找到,于是作者采用了別名轉換和全Index查詢加上bulk插入的方式對于索引進行遷移。

但是轉移數(shù)據(jù)實在太慢,所以使用了slice對scorll查詢進行優(yōu)化

Java slice scorll reindex 基于5.6版本

多線程reindex

具體開啟線程數(shù)根據(jù)Index分片數(shù)進行調整,最好和主分片數(shù)相同,本例子為五個分片,同時還使用了別名轉換對索引進行無縫銜接避免數(shù)據(jù)正常插入讀取

 //建新索引
        createUserRecordIndex(newIndexName, typeName);

        //篩選時間
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        RangeQueryBuilder rangeQueryBuilder;
        rangeQueryBuilder = QueryBuilders.rangeQuery("createTime")
                .gte(DateUtil.format(DateUtil.parse(createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT))
                .lte(DateUtil.format(DateUtil.parse(createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT));
        boolQueryBuilder.must(rangeQueryBuilder);

        try {
        
           //多線程處理查詢請求
           List<Future> list = new ArrayList<>();
            for (int i = 0; i < 5; i++) {
                SliceBuilder sliceBuilder = new SliceBuilder(i, 5);
                SearchResponse response = EsBuildersServiceUtil.getESClient()
                        .prepareSearch(userRecordAlias)
                        .setTypes(userRecordType)
                        .setQuery(boolQueryBuilder)
                        .setSize(1000).setScroll(new TimeValue(10000))
                        .slice(sliceBuilder)
                        .execute()
                        .actionGet();
                SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response);
                Future submit = threadPoolTaskExecutor.submit(sliceQuery);
                list.add(submit);
            }
            for (Future future : list) {
                future.get();
            }

        } catch (Exception e) {
            log.error("reindex error =", e);
            throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR);
        }


        try {
            //別名轉換
            EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet();
            EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet();
        } catch (Exception e) {
            log.error(" convertAlias error =", e);
            throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR);
        }

slice線程

class SliceQuery implements Callable {
        private String newIndexName;
        private String typeName;

        private SearchResponse response;

        private SliceQuery(String newIndexName, String typeName, SearchResponse response) {
            this.newIndexName = newIndexName;
            this.typeName = typeName;
            this.response = response;
        }

        @Override
        public Void call() {
            //獲取總數(shù)量
            long totalCount = response.getHits().getTotalHits();
            //計算總次數(shù),每次搜索數(shù)量為分片數(shù)*設置的size大小
            int page = (int) totalCount / 1000;
            operateRecordList(response, newIndexName, typeName);
            for (int i = 0; i < page; i++) {
                //再次發(fā)送請求,并使用上次搜索結果的ScrollId
                response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId())
                        .setScroll(new TimeValue(10000)).execute()
                        .actionGet();
                operateRecordList(response, newIndexName, typeName);
            }
            return null;
        }
    }

批量插入

  /**
     * 從查詢數(shù)據(jù)中獲取并批量插入Index
     *
     * @param response
     * @param indexName
     * @param typeName
     */
    private void operateRecordList(SearchResponse response, String indexName, String typeName) {
        try {
            SearchHits hits = response.getHits();
            List<AddUserRecordRequest> list = new ArrayList<>();
            for (SearchHit hit : hits) {
                String sourceAsString = hit.getSourceAsString();
                list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class));
            }
            //批量插入
            saveBulkRecord(list, indexName, typeName);
        } catch (Exception e) {
            log.error("operateRecordList error =", e);
            throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);
        }
    }

    /**
     * 批量插入
     *
     * @param list
     * @param indexName
     * @param typeName
     */
    private void saveBulkRecord(List<AddUserRecordRequest> list, String indexName, String typeName) {
        try {
            BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk();
            for (AddUserRecordRequest recordRequest : list) {
                JSONObject json = JSONObject.fromObject(recordRequest);
                bulkRequest.add(EsBuildersServiceUtil.getESClient()
                        .prepareIndex(indexName, typeName)
                        .setSource(json));
            }
            if (list.size() > 0) {
                bulkRequest.execute().actionGet();
            }
        } catch (Exception e) {
            log.error("saveBulkRecord error =", e);
            throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);
        }
    }

感謝各位的閱讀,以上就是“Elasticsearch reindex及Java使用sliceScorll優(yōu)化查詢的方法”的內容了,經過本文的學習后,相信大家對Elasticsearch reindex及Java使用sliceScorll優(yōu)化查詢的方法這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI