删除索引库
可以看到id为1的索引库不见了
这里要修改下配置文件
slave1,slave2也做同样的操作,在这里就不多赘述了。
这个时候记得要重启elasticseach才能生效,怎么重启这里就不多说了
运行程序
这个函数的意思是如果文件存在就更新,不存在就创建
第一次执行下来
第二次执行(因为文件已经存在了,所以就把里面的内容更新)
这个是批量操作,来获取多条索引
添加两个删除一个
1 public void test13() throws IOException, InterruptedException, 2 ExecutionException { 3 4 BulkProcessor bulkProcessor = BulkProcessor.builder( 5 client, 6 new BulkProcessor.Listener() { 7 8 public void beforeBulk(long executionId, BulkRequest request) { 9 // TODO Auto-generated method stub10 System.out.println(request.numberOfActions());11 }12 13 public void afterBulk(long executionId, BulkRequest request,14 Throwable failure) {15 // TODO Auto-generated method stub16 System.out.println(failure.getMessage());17 }18 19 public void afterBulk(long executionId, BulkRequest request,20 BulkResponse response) {21 // TODO Auto-generated method stub22 System.out.println(response.hasFailures());23 }24 })25 .setBulkActions(1000) // 每个批次的最大数量26 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数27 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔28 .setConcurrentRequests(1) //设置多少个并发处理线程29 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作30 .setBackoffPolicy(31 BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 32 .build();33 String json = "{" +34 "\"user\":\"kimchy\"," +35 "\"postDate\":\"2013-01-30\"," +36 "\"message\":\"trying out Elasticsearch\"" +37 "}";38 39 for (int i = 0; i < 1000; i++) {40 bulkProcessor.add(new IndexRequest("djt6", "user").source(json));41 }42 //阻塞至所有的请求线程处理完毕后,断开连接资源43 bulkProcessor.awaitClose(3, TimeUnit.MINUTES);44 client.close();45 }46 /**47 * SearchType使用方式48 * @throws Exception49 */50 @Test51 public void test14() throws Exception {52 SearchResponse response = client.prepareSearch("djt") 53 .setTypes("user") 54 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 55 .setSearchType(SearchType.QUERY_AND_FETCH)56 .execute() 57 .actionGet(); 58 SearchHits hits = response.getHits();59 System.out.println(hits.getTotalHits());60 }61 }
这个是批量插入
这里有1000个,我就不数了
参考代码ESTestDocumentAPI.java
1 package com.dajiangtai.djt_spider.elasticsearch; 2 3 import java.io.IOException; 4 import java.net.InetAddress; 5 import java.net.UnknownHostException; 6 import java.util.Date; 7 import java.util.HashMap; 8 import java.util.Iterator; 9 import java.util.List; 10 import java.util.Map; 11 import java.util.concurrent.ExecutionException; 12 import java.util.concurrent.TimeUnit; 13 import static org.elasticsearch.node.NodeBuilder.*; 14 import static org.elasticsearch.common.xcontent.XContentFactory.*; 15 import org.elasticsearch.action.bulk.BackoffPolicy; 16 import org.elasticsearch.action.bulk.BulkProcessor; 17 import org.elasticsearch.common.unit.ByteSizeUnit; 18 import org.elasticsearch.common.unit.ByteSizeValue; 19 import org.elasticsearch.common.unit.TimeValue; 20 import org.codehaus.jackson.map.ObjectMapper; 21 import org.elasticsearch.action.bulk.BulkItemResponse; 22 import org.elasticsearch.action.bulk.BulkRequest; 23 import org.elasticsearch.action.bulk.BulkRequestBuilder; 24 import org.elasticsearch.action.bulk.BulkResponse; 25 import org.elasticsearch.action.delete.DeleteRequestBuilder; 26 import org.elasticsearch.action.delete.DeleteResponse; 27 import org.elasticsearch.action.get.GetResponse; 28 import org.elasticsearch.action.get.MultiGetItemResponse; 29 import org.elasticsearch.action.get.MultiGetResponse; 30 import org.elasticsearch.action.index.IndexRequest; 31 import org.elasticsearch.action.index.IndexRequestBuilder; 32 import org.elasticsearch.action.index.IndexResponse; 33 import org.elasticsearch.action.search.SearchResponse; 34 import org.elasticsearch.action.search.SearchType; 35 import org.elasticsearch.action.update.UpdateRequest; 36 import org.elasticsearch.client.Client; 37 import org.elasticsearch.client.transport.TransportClient; 38 import org.elasticsearch.cluster.node.DiscoveryNode; 39 import org.elasticsearch.common.settings.Settings; 40 import org.elasticsearch.common.transport.InetSocketTransportAddress; 41 import org.elasticsearch.index.query.QueryBuilders; 42 import org.elasticsearch.node.Node; 43 import org.elasticsearch.script.Script; 44 import org.elasticsearch.script.ScriptService; 45 import org.elasticsearch.search.SearchHits; 46 import org.junit.Before; 47 import org.junit.Test; 48 49 /** 50 * Document API 操作 51 * 52 * @author 大讲台 53 * 54 */ 55 public class ESTestDocumentAPI { 56 private TransportClient client; 57 58 @Before 59 public void test0() throws UnknownHostException { 60 61 // 开启client.transport.sniff功能,探测集群所有节点 62 Settings settings = Settings.settingsBuilder() 63 .put("cluster.name", "escluster") 64 .put("client.transport.sniff", true).build(); 65 // on startup 66 // 获取TransportClient 67 client = TransportClient 68 .builder() 69 .settings(settings) 70 .build() 71 .addTransportAddress( 72 new InetSocketTransportAddress(InetAddress 73 .getByName("master"), 9300)) 74 .addTransportAddress( 75 new InetSocketTransportAddress(InetAddress 76 .getByName("slave1"), 9300)) 77 .addTransportAddress( 78 new InetSocketTransportAddress(InetAddress 79 .getByName("slave2"), 9300)); 80 } 81 82 /** 83 * 创建索引:use ElasticSearch helpers 84 * 85 * @throws IOException 86 */ 87 @Test 88 public void test1() throws IOException { 89 IndexResponse response = client 90 .prepareIndex("twitter", "tweet", "1") 91 .setSource( 92 jsonBuilder().startObject().field("user", "kimchy") 93 .field("postDate", new Date()) 94 .field("message", "trying out Elasticsearch") 95 .endObject()).get(); 96 System.out.println(response.getId()); 97 client.close(); 98 } 99 100 /**101 * 创建索引:do it yourself102 * 103 * @throws IOException104 */105 @Test106 public void test2() throws IOException {107 String json = "{" + "\"user\":\"kimchy\","108 + "\"postDate\":\"2013-01-30\","109 + "\"message\":\"trying out Elasticsearch\"" + "}";110 IndexResponse response = client.prepareIndex("twitter", "tweet")111 .setSource(json).get();112 System.out.println(response.getId());113 client.close();114 }115 116 /**117 * 创建索引:use map118 * 119 * @throws IOException120 */121 @Test122 public void test3() throws IOException {123 Mapjson = new HashMap ();124 json.put("user", "kimchy");125 json.put("postDate", new Date());126 json.put("message", "trying out Elasticsearch");127 128 IndexResponse response = client.prepareIndex("twitter", "tweet")129 .setSource(json).get();130 System.out.println(response.getId());131 client.close();132 }133 134 /**135 * 创建索引:serialize your beans136 * 137 * @throws IOException138 */139 @Test140 public void test4() throws IOException {141 User user = new User();142 user.setUser("kimchy");143 user.setPostDate(new Date());144 user.setMessage("trying out Elasticsearch");145 146 // instance a json mapper147 ObjectMapper mapper = new ObjectMapper(); // create once, reuse148 149 // generate json150 byte[] json = mapper.writeValueAsBytes(user);151 152 IndexResponse response = client.prepareIndex("twitter", "tweet")153 .setSource(json).get();154 System.out.println(response.getId());155 client.close();156 }157 158 /**159 * 查询索引:get160 * 161 * @throws IOException162 */163 @Test164 public void test5() throws IOException {165 GetResponse response = client.prepareGet("twitter", "tweet", "1").get();166 System.out.println(response.getSourceAsString());167 168 client.close();169 }170 171 /**172 * 删除索引:delete173 * 174 * @throws IOException175 */176 @Test177 public void test6() throws IOException {178 client.prepareDelete("twitter", "tweet", "1").get();179 client.close();180 }181 182 /**183 * 更新索引:Update API-UpdateRequest184 * 185 * @throws IOException186 * @throws ExecutionException187 * @throws InterruptedException188 */189 @Test190 public void test7() throws IOException, InterruptedException,191 ExecutionException {192 UpdateRequest updateRequest = new UpdateRequest();193 updateRequest.index("twitter");194 updateRequest.type("tweet");195 updateRequest.id("AVyi3OORot7zkId708s8");196 updateRequest.doc(jsonBuilder().startObject().field("gender", "male")197 .endObject());198 client.update(updateRequest).get();199 System.out.println(updateRequest.version());200 client.close();201 }202 203 /**204 * 更新索引:Update API-prepareUpdate()-doc205 * 206 * @throws IOException207 * @throws ExecutionException208 * @throws InterruptedException209 */210 @Test211 public void test8() throws IOException, InterruptedException,212 ExecutionException {213 client.prepareUpdate("twitter", "tweet", "AVyikSKIot7zkId708s6")214 .setDoc(jsonBuilder().startObject().field("gender", "female")215 .endObject()).get();216 client.close();217 }218 219 /**220 * 更新索引:Update API-prepareUpdate()-script221 * 需要开启:script.engine.groovy.inline.update: on222 * 223 * @throws IOException224 * @throws ExecutionException225 * @throws InterruptedException226 */227 @Test228 public void test9() throws IOException, InterruptedException,229 ExecutionException {230 client.prepareUpdate("twitter", "tweet", "AVyi4oZfot7zkId708s-")231 .setScript(232 new Script("ctx._source.gender = \"female\"",233 ScriptService.ScriptType.INLINE, null, null))234 .get();235 client.close();236 }237 238 /**239 * 更新索引:Update API-UpdateRequest-upsert240 * 241 * @throws IOException242 * @throws ExecutionException243 * @throws InterruptedException244 */245 @Test246 public void test10() throws IOException, InterruptedException,247 ExecutionException {248 IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "1")249 .source(jsonBuilder()250 .startObject()251 .field("name", "Joe Smith")252 .field("gender", "male")253 .endObject());254 UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")255 .doc(jsonBuilder()256 .startObject()257 .field("gender", "female")258 .endObject()).upsert(indexRequest);259 client.update(updateRequest).get();260 client.close();261 }262 263 /**264 * 批量查询索引:Multi Get API265 * 266 * @throws IOException267 * @throws ExecutionException268 * @throws InterruptedException269 */270 @Test271 public void test11() throws IOException, InterruptedException,272 ExecutionException {273 MultiGetResponse multiGetItemResponses = client.prepareMultiGet()274 .add("twitter", "tweet", "1") 275 .add("twitter", "tweet", "AVyi4oZfot7zkId708s-", "AVyi3OORot7zkId708s8", "AVyikSKIot7zkId708s6") 276 .add("djt2", "user", "1") 277 .get();278 279 for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 280 GetResponse response = itemResponse.getResponse();281 if (response.isExists()) { 282 String json = response.getSourceAsString(); 283 System.out.println(json);284 }285 }286 client.close();287 }288 289 /**290 * 批量操作索引:Bulk API291 * 292 * @throws IOException293 * @throws ExecutionException294 * @throws InterruptedException295 */296 @Test297 public void test12() throws IOException, InterruptedException,298 ExecutionException {299 BulkRequestBuilder bulkRequest = client.prepareBulk();300 301 // either use client#prepare, or use Requests# to directly build index/delete requests302 bulkRequest.add(client.prepareIndex("twitter", "tweet", "3")303 .setSource(jsonBuilder()304 .startObject()305 .field("user", "kimchy")306 .field("postDate", new Date())307 .field("message", "trying out Elasticsearch")308 .endObject()309 )310 );311 312 bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")313 .setSource(jsonBuilder()314 .startObject()315 .field("user", "kimchy")316 .field("postDate", new Date())317 .field("message", "another post")318 .endObject()319 )320 );321 DeleteRequestBuilder prepareDelete = client.prepareDelete("twitter", "tweet", "AVyikSKIot7zkId708s6");322 bulkRequest.add(prepareDelete);323 324 325 BulkResponse bulkResponse = bulkRequest.get();326 //批量操作:其中一个操作失败不影响其他操作成功执行327 if (bulkResponse.hasFailures()) {328 // process failures by iterating through each bulk response item329 BulkItemResponse[] items = bulkResponse.getItems();330 for (BulkItemResponse bulkItemResponse : items) {331 System.out.println(bulkItemResponse.getFailureMessage());332 }333 }else{334 System.out.println("bulk process success!");335 }336 client.close();337 }338 339 /**340 * 批量操作索引:Using Bulk Processor341 * 优化:先关闭副本,再添加副本,提升效率342 * @throws IOException343 * @throws ExecutionException344 * @throws InterruptedException345 */346 @Test347 public void test13() throws IOException, InterruptedException,348 ExecutionException {349 350 BulkProcessor bulkProcessor = BulkProcessor.builder(351 client, 352 new BulkProcessor.Listener() {353 354 public void beforeBulk(long executionId, BulkRequest request) {355 // TODO Auto-generated method stub356 System.out.println(request.numberOfActions());357 }358 359 public void afterBulk(long executionId, BulkRequest request,360 Throwable failure) {361 // TODO Auto-generated method stub362 System.out.println(failure.getMessage());363 }364 365 public void afterBulk(long executionId, BulkRequest request,366 BulkResponse response) {367 // TODO Auto-generated method stub368 System.out.println(response.hasFailures());369 }370 })371 .setBulkActions(1000) // 每个批次的最大数量372 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数373 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔374 .setConcurrentRequests(1) //设置多少个并发处理线程375 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作376 .setBackoffPolicy(377 BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 378 .build();379 String json = "{" +380 "\"user\":\"kimchy\"," +381 "\"postDate\":\"2013-01-30\"," +382 "\"message\":\"trying out Elasticsearch\"" +383 "}";384 385 for (int i = 0; i < 1000; i++) {386 bulkProcessor.add(new IndexRequest("djt6", "user").source(json));387 }388 //阻塞至所有的请求线程处理完毕后,断开连接资源389 bulkProcessor.awaitClose(3, TimeUnit.MINUTES);390 client.close();391 }392 /**393 * SearchType使用方式394 * @throws Exception395 */396 @Test397 public void test14() throws Exception {398 SearchResponse response = client.prepareSearch("djt") 399 .setTypes("user") 400 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 401 .setSearchType(SearchType.QUERY_AND_FETCH)402 .execute() 403 .actionGet(); 404 SearchHits hits = response.getHits();405 System.out.println(hits.getTotalHits());406 }407 }