博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ElasticSearch Document API
阅读量:4623 次
发布时间:2019-06-09

本文共 18783 字,大约阅读时间需要 62 分钟。

 

 

 

 

 

 

 

 

 

 

 

 

删除索引库

 

 可以看到id为1的索引库不见了

 

 

 

 

 

 

 

 

 这里要修改下配置文件

 

 

 slave1,slave2也做同样的操作,在这里就不多赘述了。

 

这个时候记得要重启elasticseach才能生效,怎么重启这里就不多说了

运行程序

 

 

 

 

这个函数的意思是如果文件存在就更新,不存在就创建

第一次执行下来

 

 

 

 第二次执行(因为文件已经存在了,所以就把里面的内容更新)

 

 

 

 这个是批量操作,来获取多条索引

 

 

 

 

 添加两个删除一个

 

1 public void test13() throws IOException, InterruptedException, 2             ExecutionException { 3          4         BulkProcessor bulkProcessor = BulkProcessor.builder( 5                 client,   6                 new BulkProcessor.Listener() { 7                      8                     public void beforeBulk(long executionId, BulkRequest request) { 9                         // TODO Auto-generated method stub10                         System.out.println(request.numberOfActions());11                     }12                     13                     public void afterBulk(long executionId, BulkRequest request,14                             Throwable failure) {15                         // TODO Auto-generated method stub16                         System.out.println(failure.getMessage());17                     }18                     19                     public void afterBulk(long executionId, BulkRequest request,20                             BulkResponse response) {21                         // TODO Auto-generated method stub22                         System.out.println(response.hasFailures());23                     }24                 })25                 .setBulkActions(1000) // 每个批次的最大数量26                 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数27                 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔28                 .setConcurrentRequests(1) //设置多少个并发处理线程29                 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作30                 .setBackoffPolicy(31                     BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 32                 .build();33         String json = "{" +34                 "\"user\":\"kimchy\"," +35                 "\"postDate\":\"2013-01-30\"," +36                 "\"message\":\"trying out Elasticsearch\"" +37             "}";38         39         for (int i = 0; i < 1000; i++) {40             bulkProcessor.add(new IndexRequest("djt6", "user").source(json));41         }42         //阻塞至所有的请求线程处理完毕后,断开连接资源43         bulkProcessor.awaitClose(3, TimeUnit.MINUTES);44         client.close();45     }46     /**47      * SearchType使用方式48      * @throws Exception49      */50     @Test51     public void test14() throws Exception {52         SearchResponse response = client.prepareSearch("djt")  53                 .setTypes("user")  54                 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 55                 .setSearchType(SearchType.QUERY_AND_FETCH)56                 .execute()  57                 .actionGet();  58         SearchHits hits = response.getHits();59         System.out.println(hits.getTotalHits());60     }61 }

 

这个是批量插入

 

这里有1000个,我就不数了

 

参考代码ESTestDocumentAPI.java

1 package com.dajiangtai.djt_spider.elasticsearch;  2   3 import java.io.IOException;  4 import java.net.InetAddress;  5 import java.net.UnknownHostException;  6 import java.util.Date;  7 import java.util.HashMap;  8 import java.util.Iterator;  9 import java.util.List; 10 import java.util.Map; 11 import java.util.concurrent.ExecutionException; 12 import java.util.concurrent.TimeUnit; 13 import static org.elasticsearch.node.NodeBuilder.*; 14 import static org.elasticsearch.common.xcontent.XContentFactory.*; 15 import org.elasticsearch.action.bulk.BackoffPolicy; 16 import org.elasticsearch.action.bulk.BulkProcessor; 17 import org.elasticsearch.common.unit.ByteSizeUnit; 18 import org.elasticsearch.common.unit.ByteSizeValue; 19 import org.elasticsearch.common.unit.TimeValue; 20 import org.codehaus.jackson.map.ObjectMapper; 21 import org.elasticsearch.action.bulk.BulkItemResponse; 22 import org.elasticsearch.action.bulk.BulkRequest; 23 import org.elasticsearch.action.bulk.BulkRequestBuilder; 24 import org.elasticsearch.action.bulk.BulkResponse; 25 import org.elasticsearch.action.delete.DeleteRequestBuilder; 26 import org.elasticsearch.action.delete.DeleteResponse; 27 import org.elasticsearch.action.get.GetResponse; 28 import org.elasticsearch.action.get.MultiGetItemResponse; 29 import org.elasticsearch.action.get.MultiGetResponse; 30 import org.elasticsearch.action.index.IndexRequest; 31 import org.elasticsearch.action.index.IndexRequestBuilder; 32 import org.elasticsearch.action.index.IndexResponse; 33 import org.elasticsearch.action.search.SearchResponse; 34 import org.elasticsearch.action.search.SearchType; 35 import org.elasticsearch.action.update.UpdateRequest; 36 import org.elasticsearch.client.Client; 37 import org.elasticsearch.client.transport.TransportClient; 38 import org.elasticsearch.cluster.node.DiscoveryNode; 39 import org.elasticsearch.common.settings.Settings; 40 import org.elasticsearch.common.transport.InetSocketTransportAddress; 41 import org.elasticsearch.index.query.QueryBuilders; 42 import org.elasticsearch.node.Node; 43 import org.elasticsearch.script.Script; 44 import org.elasticsearch.script.ScriptService; 45 import org.elasticsearch.search.SearchHits; 46 import org.junit.Before; 47 import org.junit.Test; 48  49 /** 50  * Document API 操作 51  *  52  * @author 大讲台 53  *  54  */ 55 public class ESTestDocumentAPI { 56     private TransportClient client; 57  58     @Before 59     public void test0() throws UnknownHostException { 60  61         // 开启client.transport.sniff功能,探测集群所有节点 62         Settings settings = Settings.settingsBuilder() 63                 .put("cluster.name", "escluster") 64                 .put("client.transport.sniff", true).build(); 65         // on startup 66         // 获取TransportClient 67         client = TransportClient 68                 .builder() 69                 .settings(settings) 70                 .build() 71                 .addTransportAddress( 72                         new InetSocketTransportAddress(InetAddress 73                                 .getByName("master"), 9300)) 74                 .addTransportAddress( 75                         new InetSocketTransportAddress(InetAddress 76                                 .getByName("slave1"), 9300)) 77                 .addTransportAddress( 78                         new InetSocketTransportAddress(InetAddress 79                                 .getByName("slave2"), 9300)); 80     } 81  82     /** 83      * 创建索引:use ElasticSearch helpers 84      *  85      * @throws IOException 86      */ 87     @Test 88     public void test1() throws IOException { 89         IndexResponse response = client 90                 .prepareIndex("twitter", "tweet", "1") 91                 .setSource( 92                         jsonBuilder().startObject().field("user", "kimchy") 93                                 .field("postDate", new Date()) 94                                 .field("message", "trying out Elasticsearch") 95                                 .endObject()).get(); 96         System.out.println(response.getId()); 97         client.close(); 98     } 99 100     /**101      * 创建索引:do it yourself102      * 103      * @throws IOException104      */105     @Test106     public void test2() throws IOException {107         String json = "{" + "\"user\":\"kimchy\","108                 + "\"postDate\":\"2013-01-30\","109                 + "\"message\":\"trying out Elasticsearch\"" + "}";110         IndexResponse response = client.prepareIndex("twitter", "tweet")111                 .setSource(json).get();112         System.out.println(response.getId());113         client.close();114     }115 116     /**117      * 创建索引:use map118      * 119      * @throws IOException120      */121     @Test122     public void test3() throws IOException {123         Map
json = new HashMap
();124 json.put("user", "kimchy");125 json.put("postDate", new Date());126 json.put("message", "trying out Elasticsearch");127 128 IndexResponse response = client.prepareIndex("twitter", "tweet")129 .setSource(json).get();130 System.out.println(response.getId());131 client.close();132 }133 134 /**135 * 创建索引:serialize your beans136 * 137 * @throws IOException138 */139 @Test140 public void test4() throws IOException {141 User user = new User();142 user.setUser("kimchy");143 user.setPostDate(new Date());144 user.setMessage("trying out Elasticsearch");145 146 // instance a json mapper147 ObjectMapper mapper = new ObjectMapper(); // create once, reuse148 149 // generate json150 byte[] json = mapper.writeValueAsBytes(user);151 152 IndexResponse response = client.prepareIndex("twitter", "tweet")153 .setSource(json).get();154 System.out.println(response.getId());155 client.close();156 }157 158 /**159 * 查询索引:get160 * 161 * @throws IOException162 */163 @Test164 public void test5() throws IOException {165 GetResponse response = client.prepareGet("twitter", "tweet", "1").get();166 System.out.println(response.getSourceAsString());167 168 client.close();169 }170 171 /**172 * 删除索引:delete173 * 174 * @throws IOException175 */176 @Test177 public void test6() throws IOException {178 client.prepareDelete("twitter", "tweet", "1").get();179 client.close();180 }181 182 /**183 * 更新索引:Update API-UpdateRequest184 * 185 * @throws IOException186 * @throws ExecutionException187 * @throws InterruptedException188 */189 @Test190 public void test7() throws IOException, InterruptedException,191 ExecutionException {192 UpdateRequest updateRequest = new UpdateRequest();193 updateRequest.index("twitter");194 updateRequest.type("tweet");195 updateRequest.id("AVyi3OORot7zkId708s8");196 updateRequest.doc(jsonBuilder().startObject().field("gender", "male")197 .endObject());198 client.update(updateRequest).get();199 System.out.println(updateRequest.version());200 client.close();201 }202 203 /**204 * 更新索引:Update API-prepareUpdate()-doc205 * 206 * @throws IOException207 * @throws ExecutionException208 * @throws InterruptedException209 */210 @Test211 public void test8() throws IOException, InterruptedException,212 ExecutionException {213 client.prepareUpdate("twitter", "tweet", "AVyikSKIot7zkId708s6")214 .setDoc(jsonBuilder().startObject().field("gender", "female")215 .endObject()).get();216 client.close();217 }218 219 /**220 * 更新索引:Update API-prepareUpdate()-script221 * 需要开启:script.engine.groovy.inline.update: on222 * 223 * @throws IOException224 * @throws ExecutionException225 * @throws InterruptedException226 */227 @Test228 public void test9() throws IOException, InterruptedException,229 ExecutionException {230 client.prepareUpdate("twitter", "tweet", "AVyi4oZfot7zkId708s-")231 .setScript(232 new Script("ctx._source.gender = \"female\"",233 ScriptService.ScriptType.INLINE, null, null))234 .get();235 client.close();236 }237 238 /**239 * 更新索引:Update API-UpdateRequest-upsert240 * 241 * @throws IOException242 * @throws ExecutionException243 * @throws InterruptedException244 */245 @Test246 public void test10() throws IOException, InterruptedException,247 ExecutionException {248 IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "1")249 .source(jsonBuilder()250 .startObject()251 .field("name", "Joe Smith")252 .field("gender", "male")253 .endObject());254 UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")255 .doc(jsonBuilder()256 .startObject()257 .field("gender", "female")258 .endObject()).upsert(indexRequest);259 client.update(updateRequest).get();260 client.close();261 }262 263 /**264 * 批量查询索引:Multi Get API265 * 266 * @throws IOException267 * @throws ExecutionException268 * @throws InterruptedException269 */270 @Test271 public void test11() throws IOException, InterruptedException,272 ExecutionException {273 MultiGetResponse multiGetItemResponses = client.prepareMultiGet()274 .add("twitter", "tweet", "1") 275 .add("twitter", "tweet", "AVyi4oZfot7zkId708s-", "AVyi3OORot7zkId708s8", "AVyikSKIot7zkId708s6") 276 .add("djt2", "user", "1") 277 .get();278 279 for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 280 GetResponse response = itemResponse.getResponse();281 if (response.isExists()) { 282 String json = response.getSourceAsString(); 283 System.out.println(json);284 }285 }286 client.close();287 }288 289 /**290 * 批量操作索引:Bulk API291 * 292 * @throws IOException293 * @throws ExecutionException294 * @throws InterruptedException295 */296 @Test297 public void test12() throws IOException, InterruptedException,298 ExecutionException {299 BulkRequestBuilder bulkRequest = client.prepareBulk();300 301 // either use client#prepare, or use Requests# to directly build index/delete requests302 bulkRequest.add(client.prepareIndex("twitter", "tweet", "3")303 .setSource(jsonBuilder()304 .startObject()305 .field("user", "kimchy")306 .field("postDate", new Date())307 .field("message", "trying out Elasticsearch")308 .endObject()309 )310 );311 312 bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")313 .setSource(jsonBuilder()314 .startObject()315 .field("user", "kimchy")316 .field("postDate", new Date())317 .field("message", "another post")318 .endObject()319 )320 );321 DeleteRequestBuilder prepareDelete = client.prepareDelete("twitter", "tweet", "AVyikSKIot7zkId708s6");322 bulkRequest.add(prepareDelete);323 324 325 BulkResponse bulkResponse = bulkRequest.get();326 //批量操作:其中一个操作失败不影响其他操作成功执行327 if (bulkResponse.hasFailures()) {328 // process failures by iterating through each bulk response item329 BulkItemResponse[] items = bulkResponse.getItems();330 for (BulkItemResponse bulkItemResponse : items) {331 System.out.println(bulkItemResponse.getFailureMessage());332 }333 }else{334 System.out.println("bulk process success!");335 }336 client.close();337 }338 339 /**340 * 批量操作索引:Using Bulk Processor341 * 优化:先关闭副本,再添加副本,提升效率342 * @throws IOException343 * @throws ExecutionException344 * @throws InterruptedException345 */346 @Test347 public void test13() throws IOException, InterruptedException,348 ExecutionException {349 350 BulkProcessor bulkProcessor = BulkProcessor.builder(351 client, 352 new BulkProcessor.Listener() {353 354 public void beforeBulk(long executionId, BulkRequest request) {355 // TODO Auto-generated method stub356 System.out.println(request.numberOfActions());357 }358 359 public void afterBulk(long executionId, BulkRequest request,360 Throwable failure) {361 // TODO Auto-generated method stub362 System.out.println(failure.getMessage());363 }364 365 public void afterBulk(long executionId, BulkRequest request,366 BulkResponse response) {367 // TODO Auto-generated method stub368 System.out.println(response.hasFailures());369 }370 })371 .setBulkActions(1000) // 每个批次的最大数量372 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数373 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔374 .setConcurrentRequests(1) //设置多少个并发处理线程375 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作376 .setBackoffPolicy(377 BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 378 .build();379 String json = "{" +380 "\"user\":\"kimchy\"," +381 "\"postDate\":\"2013-01-30\"," +382 "\"message\":\"trying out Elasticsearch\"" +383 "}";384 385 for (int i = 0; i < 1000; i++) {386 bulkProcessor.add(new IndexRequest("djt6", "user").source(json));387 }388 //阻塞至所有的请求线程处理完毕后,断开连接资源389 bulkProcessor.awaitClose(3, TimeUnit.MINUTES);390 client.close();391 }392 /**393 * SearchType使用方式394 * @throws Exception395 */396 @Test397 public void test14() throws Exception {398 SearchResponse response = client.prepareSearch("djt") 399 .setTypes("user") 400 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 401 .setSearchType(SearchType.QUERY_AND_FETCH)402 .execute() 403 .actionGet(); 404 SearchHits hits = response.getHits();405 System.out.println(hits.getTotalHits());406 }407 }

 

转载于:https://www.cnblogs.com/braveym/p/7011119.html

你可能感兴趣的文章
HDU3602 2012【dp】
查看>>
SocketExceptioon关于客户端只可以收发一次的问题
查看>>
js中数组常用方法
查看>>
09_数组 ...
查看>>
Unity Shader——Writing Surface Shaders(0)
查看>>
Chatper 10-Using DDL Statements to Create and Manage Tables
查看>>
在网页中插入百度地图
查看>>
iOS 开发之Block
查看>>
详解Swift和OC的混编
查看>>
为什么C++函数形参默认值从最末一个赋值?
查看>>
Lua string文件类型判断和内容解析
查看>>
MyBatis的动态SQL详解
查看>>
android - 多屏幕适配相关
查看>>
Fedora Linux 18 延期至年底
查看>>
Spring Framework 3.2 RC1 发布
查看>>
基于ios开发点餐系统应用(附带源码)
查看>>
Xenia and Weights(深度优先搜索)
查看>>
文件包含漏洞进阶篇
查看>>
JavaScript的self和this使用小结
查看>>
CSS3.0:透明度 Opacity
查看>>