|
@@ -179,7 +179,7 @@ public class DefaultElasticSearchService implements ElasticSearchService {
|
|
.zipWith(indexManager.getIndexMetadata(index))
|
|
.zipWith(indexManager.getIndexMetadata(index))
|
|
.flatMapMany(tp2 ->
|
|
.flatMapMany(tp2 ->
|
|
group.map(buffer -> {
|
|
group.map(buffer -> {
|
|
- IndexRequest request = new IndexRequest(tp2.getT1(),"_doc");
|
|
|
|
|
|
+ IndexRequest request = new IndexRequest(tp2.getT1(), "_doc");
|
|
Object o = JSON.toJSON(buffer.getPayload());
|
|
Object o = JSON.toJSON(buffer.getPayload());
|
|
if (o instanceof Map) {
|
|
if (o instanceof Map) {
|
|
request.source(tp2.getT2().convertToElastic((Map<String, Object>) o));
|
|
request.source(tp2.getT2().convertToElastic((Map<String, Object>) o));
|
|
@@ -194,9 +194,9 @@ public class DefaultElasticSearchService implements ElasticSearchService {
|
|
.flatMap(lst -> {
|
|
.flatMap(lst -> {
|
|
BulkRequest request = new BulkRequest();
|
|
BulkRequest request = new BulkRequest();
|
|
lst.forEach(request::add);
|
|
lst.forEach(request::add);
|
|
- return ReactorActionListener.<BulkResponse>mono(listener -> {
|
|
|
|
- restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener);
|
|
|
|
- });
|
|
|
|
|
|
+ return ReactorActionListener.<BulkResponse>mono(listener ->
|
|
|
|
+ restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener)
|
|
|
|
+ );
|
|
}).thenReturn(buffers.size());
|
|
}).thenReturn(buffers.size());
|
|
}
|
|
}
|
|
|
|
|