您好,登錄后才能下訂單哦!
[TOC]
使用maven工程構(gòu)建ES Java API的測(cè)試項(xiàng)目,其用到的依賴如下:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.0.0</version>
</dependency>
<!--使用lombok,對(duì)于Java Bean對(duì)象,就不用手動(dòng)添加getter和setter方法,在編譯時(shí),它會(huì)幫我們自動(dòng)添加-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
使用junit進(jìn)行測(cè)試,其使用的全局變量與setUp函數(shù)如下:
private TransportClient client;
private String index = "bigdata"; // 要操作的索引庫(kù)為"bigdata"
private String type = "product"; // 要操作的類型為"product"
@Before
public void setup() throws UnknownHostException {
// 連接的是ES集群,所以需要添加集群名稱,否則無法創(chuàng)建客戶端
Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build();
client = TransportClient.builder().settings(settings).build();
TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300);
TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300);
TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300);
client.addTransportAddresses(ta1, ta2, ta3);
/*settings = client.settings();
Map<String, String> asMap = settings.getAsMap();
for(Map.Entry<String, String> setting : asMap.entrySet()) {
System.out.println(setting.getKey() + "::" + setting.getValue());
}*/
}
/**
* 注意:往es中添加數(shù)據(jù)有4種方式
* 1.JSON
* 2.Map
* 3.Java Bean
* 4.XContentBuilder
*
* 1.JSON方式
*/
@Test
public void testAddJSON() {
String source = "{\"name\":\"sqoop\", \"author\": \"apache\", \"version\": \"1.4.6\"}";
IndexResponse response = client.prepareIndex(index, type, "4").setSource(source).get();
System.out.println(response.isCreated());
}
/**
* 添加數(shù)據(jù):
* 2.Map方式
*/
@Test
public void testAddMap() {
Map<String, Object> source = new HashMap<String, Object>();
source.put("name", "flume");
source.put("author", "Cloudera");
source.put("version", "1.8.0");
IndexResponse response = client.prepareIndex(index, type, "5").setSource(source).get();
System.out.println(response.isCreated());
}
/**
* 添加數(shù)據(jù):
* 3.Java Bean方式
*
* 如果不將對(duì)象轉(zhuǎn)換為json字符串,則會(huì)報(bào)下面的異常:
* The number of object passed must be even but was [1]
*/
@Test
public void testAddObj() throws JsonProcessingException {
Product product = new Product("kafka", "linkedIn", "0.10.0.1", "kafka.apache.org");
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(product);
System.out.println(json);
IndexResponse response = client.prepareIndex(index, type, "6").setSource(json).get();
System.out.println(response.isCreated());
}
/**
* 添加數(shù)據(jù):
* 4.XContentBuilder方式
*/
@Test
public void testAddXContentBuilder() throws IOException {
XContentBuilder source = XContentFactory.jsonBuilder();
source.startObject()
.field("name", "redis")
.field("author", "redis")
.field("version", "3.2.0")
.field("url", "redis.cn")
.endObject();
IndexResponse response = client.prepareIndex(index, type, "7").setSource(source).get();
System.out.println(response.isCreated());
}
/**
* 查詢具體的索引信息
*/
@Test
public void testGet() {
GetResponse response = client.prepareGet(index, type, "6").get();
Map<String, Object> map = response.getSource();
/*for(Map.Entry<String, Object> me : map.entrySet()) {
System.out.println(me.getKey() + "=" + me.getValue());
}*/
// lambda表達(dá)式,jdk 1.8之后
map.forEach((k, v) -> System.out.println(k + "=" + v));
// map.keySet().forEach(key -> System.out.println(key + "xxx"));
}
/**
* 局部更新操作與curl的操作是一致的
* curl -XPOST http://uplooking01:9200/bigdata/product/AWA184kojrSrzszxL-Zs/_update -d' {"doc":{"name":"sqoop", "author":"apache"}}'
*
* 做全局更新的時(shí)候,也不用prepareUpdate,而直接使用prepareIndex
*/
@Test
public void testUpdate() throws Exception {
/*String source = "{\"doc\":{\"url\": \"http://flume.apache.org\"}}";
UpdateResponse response = client.prepareUpdate(index, type, "4").setSource(source.getBytes()).get();*/
// 使用下面這種方式也是可以的
String source = "{\"url\": \"http://flume.apache.org\"}";
UpdateResponse response = client.prepareUpdate(index, type, "4").setDoc(source.getBytes()).get();
System.out.println(response.getVersion());
}
/**
* 刪除操作
*/
@Test
public void testDelete() {
DeleteResponse response = client.prepareDelete(index, type, "5").get();
System.out.println(response.getVersion());
}
/**
* 批量操作
*/
@Test
public void testBulk() {
IndexRequestBuilder indexRequestBuilder = client.prepareIndex(index, type, "8")
.setSource("{\"name\":\"elasticsearch\", \"url\":\"http://www.elastic.co\"}");
UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(index, type, "1").setDoc("{\"url\":\"http://hadoop.apache.org\"}");
BulkRequestBuilder bulk = client.prepareBulk();
BulkResponse bulkResponse = bulk.add(indexRequestBuilder).add(updateRequestBuilder).get();
Iterator<BulkItemResponse> it = bulkResponse.iterator();
while(it.hasNext()) {
BulkItemResponse response = it.next();
System.out.println(response.getId() + "<--->" + response.getVersion());
}
}
/**
* 獲取索引記錄數(shù)
*/
@Test
public void testCount() {
CountResponse response = client.prepareCount(index).get();
System.out.println("索引記錄數(shù):" + response.getCount());
}
基于junit進(jìn)行測(cè)試,其用到的setUp函數(shù)和showResult函數(shù)如下:
全局變量與setUp:
private TransportClient client;
private String index = "bigdata";
private String type = "product";
private String[] indics = {"bigdata", "bank"};
@Before
public void setUp() throws UnknownHostException {
Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build();
client = TransportClient.builder().settings(settings).build();
TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300);
TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300);
TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300);
client.addTransportAddresses(ta1, ta2, ta3);
}
showResult:
/**
* 格式化輸出查詢結(jié)果
* @param response
*/
private void showResult(SearchResponse response) {
SearchHits searchHits = response.getHits();
float maxScore = searchHits.getMaxScore(); // 查詢結(jié)果中的最大文檔得分
System.out.println("maxScore: " + maxScore);
long totalHits = searchHits.getTotalHits(); // 查詢結(jié)果記錄條數(shù)
System.out.println("totalHits: " + totalHits);
SearchHit[] hits = searchHits.getHits(); // 查詢結(jié)果
System.out.println("當(dāng)前返回結(jié)果記錄條數(shù):" + hits.length);
for (SearchHit hit : hits) {
long version = hit.version();
String id = hit.getId();
String index = hit.getIndex();
String type = hit.getType();
float score = hit.getScore();
System.out.println("===================================================");
String source = hit.getSourceAsString();
System.out.println("version: " + version);
System.out.println("id: " + id);
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("score: " + score);
System.out.println("source: " + source);
}
}
查詢類型有如下4種:
query and fetch(速度最快)(返回N倍數(shù)據(jù)量)
query then fetch(默認(rèn)的搜索方式)
DFS query and fetch
DFS query then fetch(可以更精確控制搜索打分和排名。)
查看API的注釋如下:
/**
* Same as {@link #QUERY_THEN_FETCH}, except for an initial scatter phase which goes and computes the distributed
* term frequencies for more accurate scoring.
*/
DFS_QUERY_THEN_FETCH((byte) 0),
/**
* The query is executed against all shards, but only enough information is returned (not the document content).
* The results are then sorted and ranked, and based on it, only the relevant shards are asked for the actual
* document content. The return number of hits is exactly as specified in size, since they are the only ones that
* are fetched. This is very handy when the index has a lot of shards (not replicas, shard id groups).
*/
QUERY_THEN_FETCH((byte) 1),
/**
* Same as {@link #QUERY_AND_FETCH}, except for an initial scatter phase which goes and computes the distributed
* term frequencies for more accurate scoring.
*/
DFS_QUERY_AND_FETCH((byte) 2),
/**
* The most naive (and possibly fastest) implementation is to simply execute the query on all relevant shards
* and return the results. Each shard returns size results. Since each shard already returns size hits, this
* type actually returns size times number of shards results back to the caller.
*/
QUERY_AND_FETCH((byte) 3),
關(guān)于DFS的說明:
DFS是什么縮寫?
這個(gè)D可能是Distributed,F(xiàn)可能是frequency的縮寫,至于S可能是Scatter的縮寫,整個(gè)單詞可能是分布式詞頻率和
文檔頻率散發(fā)的縮寫。
初始化散發(fā)是一個(gè)什么樣的過程?
從es的官方網(wǎng)站我們可以發(fā)現(xiàn),初始化散發(fā)其實(shí)就是在進(jìn)行真正的查詢之前,先把各個(gè)分片的詞頻率和文檔頻率收集一
下,然后進(jìn)行詞搜索的時(shí)候,各分片依據(jù)全局的詞頻率和文檔頻率進(jìn)行搜索和排名。顯然如果使用
DFS_QUERY_THEN_FETCH這種查詢方式,效率是最低的,因?yàn)橐粋€(gè)搜索,可能要請(qǐng)求3次分片。但,使用DFS方法,搜索
精度應(yīng)該是最高的。
總結(jié):
總結(jié)一下,從性能考慮QUERY_AND_FETCH是最快的,DFS_QUERY_THEN_FETCH是最慢的。從搜索的準(zhǔn)確度來說,DFS要
比非DFS的準(zhǔn)確度更高。
/**
* 1.精確查詢
* termQuery
* term就是一個(gè)字段
*/
@Test
public void testSearch2() {
SearchRequestBuilder searchQuery = client.prepareSearch(indics) // 在prepareSearch()的參數(shù)為索引庫(kù)列表,意為要從哪些索引庫(kù)中進(jìn)行查詢
.setSearchType(SearchType.DEFAULT) // 設(shè)置查詢類型,有QUERY_AND_FETCH QUERY_THEN_FETCH DFS_QUERY_AND_FETCH DFS_QUERY_THEN_FETCH
.setQuery(QueryBuilders.termQuery("author", "apache"))// 設(shè)置相應(yīng)的query,用于檢索,termQuery的參數(shù)說明:name是doc中的具體的field,value就是要找的具體的值
;
// 如果上面不加查詢條件,則會(huì)查詢所有
SearchResponse response = searchQuery.get();
showResult(response);
}
/**
* 2.模糊查詢
* prefixQuery
*/
@Test
public void testSearch3() {
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.prefixQuery("name", "h"))
.get();
showResult(response);
}
/**
* 3.分頁查詢
* 查詢索引庫(kù)bank中
* 年齡在(25, 35]之間的數(shù)據(jù)信息
*
* 分頁算法:
* 查詢的第幾頁,每一頁顯示幾條
* 每頁顯示10條記錄
*
* 查詢第4頁的內(nèi)容
* setFrom(30=(4-1)*size)
* setSize(10)
* 所以第N頁的起始位置:(N - 1) * pageSize
*/
@Test
public void testSearch4() {
// 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的記錄數(shù)不一樣,前者默認(rèn)10條,后者是50條(5個(gè)分片)
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35))
// 下面setFrom和setSize用于設(shè)置查詢結(jié)果進(jìn)行分頁
.setFrom(0)
.setSize(5)
.get();
showResult(response);
}
/**
* 4.高亮顯示查詢
* 獲取數(shù)據(jù),
* 查詢apache,不僅在author擁有,也可以在url,在name中也可能擁有
* author or url --->booleanQuery中的should操作
* 如果是and的類型--->booleanQuery中的must操作
* 如果是not的類型--->booleanQuery中的mustNot操作
* 使用的match操作,其實(shí)就是使用要查詢的keyword和對(duì)應(yīng)字段進(jìn)行完整匹配,是否相等,相等返回
*/
@Test
public void testSearch5() {
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DEFAULT)
// .setQuery(QueryBuilders.multiMatchQuery("apache", "author", "url"))
// .setQuery(QueryBuilders.regexpQuery("url", ".*apache.*"))
// .setQuery(QueryBuilders.termQuery("author", "apache"))
.setQuery(QueryBuilders.boolQuery()
.should(QueryBuilders.regexpQuery("url", ".*apache.*"))
.should(QueryBuilders.termQuery("author", "apache")))
// 設(shè)置高亮顯示--->設(shè)置相應(yīng)的前置標(biāo)簽和后置標(biāo)簽
.setHighlighterPreTags("<span color='blue' size='18px'>")
.setHighlighterPostTags("</span>")
// 哪個(gè)字段要求高亮顯示
.addHighlightedField("author")
.addHighlightedField("url")
.get();
SearchHits searchHits = response.getHits();
float maxScore = searchHits.getMaxScore(); // 查詢結(jié)果中的最大文檔得分
System.out.println("maxScore: " + maxScore);
long totalHits = searchHits.getTotalHits(); // 查詢結(jié)果記錄條數(shù)
System.out.println("totalHits: " + totalHits);
SearchHit[] hits = searchHits.getHits(); // 查詢結(jié)果
System.out.println("當(dāng)前返回結(jié)果記錄條數(shù):" + hits.length);
for(SearchHit hit : hits) {
System.out.println("========================================================");
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
for(Map.Entry<String , HighlightField> me : highlightFields.entrySet()) {
System.out.println("--------------------------------------");
String key = me.getKey();
HighlightField highlightField = me.getValue();
String name = highlightField.getName();
System.out.println("key: " + key + ", name: " + name);
Text[] texts = highlightField.fragments();
String value = "";
for(Text text : texts) {
// System.out.println("text: " + text.toString());
value += text.toString();
}
System.out.println("value: " + value);
}
}
}
/**
* 5.排序查詢
* 對(duì)結(jié)果集進(jìn)行排序
* balance(收入)由高到低
*/
@Test
public void testSearch6() {
// 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的記錄數(shù)不一樣,前者默認(rèn)10條,后者是50條(5個(gè)分片)
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35))
.addSort("balance", SortOrder.DESC)
// 下面setFrom和setSize用于設(shè)置查詢結(jié)果進(jìn)行分頁
.setFrom(0)
.setSize(5)
.get();
showResult(response);
}
/**
* 6.聚合查詢:計(jì)算平均值
*/
@Test
public void testSearch7() {
indics = new String[]{"bank"};
// 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的記錄數(shù)不一樣,前者默認(rèn)10條,后者是50條(5個(gè)分片)
SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35))
/*
select avg(age) as avg_name from person;
那么這里的avg("balance")--->就是返回結(jié)果avg_name這個(gè)別名
*/
.addAggregation(AggregationBuilders.avg("avg_balance").field("balance"))
.addAggregation(AggregationBuilders.max("max").field("balance"))
.get();
// System.out.println(response);
/*
response中包含的Aggregations
"aggregations" : {
"max" : {
"value" : 49741.0
},
"avg_balance" : {
"value" : 25142.137373737372
}
}
則一個(gè)aggregation為:
{
"value" : 49741.0
}
*/
Aggregations aggregations = response.getAggregations();
List<Aggregation> aggregationList = aggregations.asList();
for(Aggregation aggregation : aggregationList) {
System.out.println("========================================");
String name = aggregation.getName();
// Map<String, Object> map = aggregation.getMetaData();
System.out.println("name: " + name);
// System.out.println(map);
Object obj = aggregation.getProperty("value");
System.out.println(obj);
}
/*Aggregation avgBalance = aggregations.get("avg_balance");
Object obj = avgBalance.getProperty("value");
System.out.println(obj);*/
}
如果我們的數(shù)據(jù)包含中文,而在查詢時(shí)希望可以支持對(duì)中文進(jìn)行分詞搜索,那么ES本身依賴于Lucene的分詞對(duì)中文就不佳了,這時(shí)就可以考慮使用其它分詞方法,如這里要說明的IK中文分詞,其集成到ES的步驟如下:
1)下載地址:
https://github.com/medcl/elasticsearch-analysis-ik
2)使用maven對(duì)源代碼進(jìn)行編譯(mvn clean install -DskipTests)(package)
3)把編譯后的target/releases下的zip文件拷貝到 ES_HOME/plugins/analysis-ik目錄下面,然后解壓
4)把下載的ik插件中的conf/ik目錄拷貝到ES_HOME/config下
5)修改ES_HOME/config/elasticsearch.yml文件,添加index.analysis.analyzer.default.type: ik
(把IK設(shè)置為默認(rèn)分詞器,這一步是可選的)
6)重啟es服務(wù)
7)測(cè)試分詞效果
需要說明的是,數(shù)據(jù)需要重新插入,并使用ik分詞,即需要重新構(gòu)建期望使用中文分詞IK的索引庫(kù)。
測(cè)試代碼如下:
package cn.xpleaf.bigdata.elasticsearch;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
/**
* 使用Java API來操作es集群
* Transport
* 代表了一個(gè)集群
* 我們客戶端和集群通信是使用TransportClient
* <p>
* 使用prepareSearch來完成全文檢索之
* 中文分詞
*/
public class ElasticSearchTest3 {
private TransportClient client;
private String index = "bigdata";
private String type = "product";
private String[] indics = {"chinese"};
@Before
public void setUp() throws UnknownHostException {
Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build();
client = TransportClient.builder().settings(settings).build();
TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300);
TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300);
TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300);
client.addTransportAddresses(ta1, ta2, ta3);
}
/**
* 中文分詞的操作
* 1.查詢以"中"開頭的數(shù)據(jù),有兩條
* 2.查詢以“中國(guó)”開頭的數(shù)據(jù),有0條
* 3.查詢包含“爛”的數(shù)據(jù),有1條
* 4.查詢包含“爛攤子”的數(shù)據(jù),有0條
* 分詞:
* 為什么我們搜索China is the greatest country~
* 中文:中國(guó)最牛逼
*
* ×××
* 中華
* 人民
* 共和國(guó)
* 中華人民
* 人民共和國(guó)
* 華人
* 共和
* 特殊的中文分詞法:
* 庖丁解牛
* IK分詞法
* 搜狗分詞法
*/
@Test
public void testSearch2() {
SearchResponse response = client.prepareSearch(indics) // 在prepareSearch()的參數(shù)為索引庫(kù)列表,意為要從哪些索引庫(kù)中進(jìn)行查詢
.setSearchType(SearchType.DEFAULT) // 設(shè)置查詢類型,有QUERY_AND_FETCH QUERY_THEN_FETCH DFS_QUERY_AND_FETCH DFS_QUERY_THEN_FETCH
//.setQuery(QueryBuilders.prefixQuery("content", "爛攤子"))// 設(shè)置相應(yīng)的query,用于檢索,termQuery的參數(shù)說明:name是doc中的具體的field,value就是要找的具體的值
// .setQuery(QueryBuilders.regexpQuery("content", ".*爛攤子.*"))
.setQuery(QueryBuilders.prefixQuery("content", "中國(guó)"))
.get();
showResult(response);
}
/**
* 格式化輸出查詢結(jié)果
* @param response
*/
private void showResult(SearchResponse response) {
SearchHits searchHits = response.getHits();
float maxScore = searchHits.getMaxScore(); // 查詢結(jié)果中的最大文檔得分
System.out.println("maxScore: " + maxScore);
long totalHits = searchHits.getTotalHits(); // 查詢結(jié)果記錄條數(shù)
System.out.println("totalHits: " + totalHits);
SearchHit[] hits = searchHits.getHits(); // 查詢結(jié)果
System.out.println("當(dāng)前返回結(jié)果記錄條數(shù):" + hits.length);
for (SearchHit hit : hits) {
long version = hit.version();
String id = hit.getId();
String index = hit.getIndex();
String type = hit.getType();
float score = hit.getScore();
System.out.println("===================================================");
String source = hit.getSourceAsString();
System.out.println("version: " + version);
System.out.println("id: " + id);
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("score: " + score);
System.out.println("source: " + source);
}
}
@After
public void cleanUp() {
client.close();
}
}
相關(guān)測(cè)試代碼已上傳到GitHub:https://github.com/xpleaf/elasticsearch-study
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。