zhouhao 3 سال پیش
والد
کامیت
34cb265917

+ 41 - 18
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java

@@ -3,7 +3,6 @@ package org.jetlinks.community.elastic.search.service.reactive;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
-import io.vavr.Function3;
 import lombok.Generated;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -91,6 +90,7 @@ import org.springframework.data.elasticsearch.client.ClientLogger;
 import org.springframework.data.elasticsearch.client.ElasticsearchHost;
 import org.springframework.data.elasticsearch.client.NoReachableHostException;
 import org.springframework.data.elasticsearch.client.reactive.HostProvider;
+import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
 import org.springframework.data.elasticsearch.client.reactive.RequestCreator;
 import org.springframework.data.elasticsearch.client.util.NamedXContents;
 import org.springframework.data.elasticsearch.client.util.RequestConverters;
@@ -112,6 +112,7 @@ import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
+import reactor.function.Function3;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -129,7 +130,7 @@ import static org.springframework.data.elasticsearch.client.util.RequestConverte
 
 @Slf4j
 @Generated
-public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Cluster {
+public class DefaultReactiveElasticsearchClient implements org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient, ReactiveElasticsearchClient.Cluster {
     private final HostProvider<?> hostProvider;
     private final RequestCreator requestCreator;
     private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;
@@ -442,7 +443,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
     @Override
     public Mono<ByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
-        return null;
+        return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers)
+            .next()
+            .map(ByQueryResponse::of);
     }
 
     static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) {
@@ -664,7 +667,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
     @Override
     public Mono<Boolean> createIndex(HttpHeaders headers, org.elasticsearch.client.indices.CreateIndexRequest createIndexRequest) {
-        return null;
+        return sendRequest(createIndexRequest, requestCreator.createIndexRequest(), AcknowledgedResponse.class, headers)
+            .map(AcknowledgedResponse::isAcknowledged)
+            .next();
     }
 
     /*
@@ -712,12 +717,16 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
     @Override
     public Mono<Boolean> putMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
-        return null;
+        return sendRequest(putMappingRequest, requestCreator.putMapping(), AcknowledgedResponse.class, headers)
+            .map(AcknowledgedResponse::isAcknowledged)
+            .next();
     }
 
     @Override
     public Mono<Boolean> putMapping(HttpHeaders headers, org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) {
-        return null;
+        return sendRequest(putMappingRequest, requestCreator.putMappingRequest(), AcknowledgedResponse.class, headers)
+            .map(AcknowledgedResponse::isAcknowledged)
+            .next();
     }
 
     /*
@@ -1168,52 +1177,66 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
 
     @Override
     public Mono<GetMappingsResponse> getMapping(HttpHeaders headers, GetMappingsRequest getMappingsRequest) {
-        return null;
+        return sendRequest(getMappingsRequest, requestCreator.getMapping(),
+                           GetMappingsResponse.class, headers).next();
     }
 
     @Override
     public Mono<org.elasticsearch.client.indices.GetMappingsResponse> getMapping(HttpHeaders headers, org.elasticsearch.client.indices.GetMappingsRequest getMappingsRequest) {
-        return null;
+        return sendRequest(getMappingsRequest, requestCreator.getMappingRequest(), org.elasticsearch.client.indices.GetMappingsResponse.class, headers) //
+                                                                                                                                                        .next();
     }
 
     @Override
-    public Mono<GetFieldMappingsResponse> getFieldMapping(HttpHeaders headers, GetFieldMappingsRequest getFieldMappingsRequest) {
-        return null;
+    public Mono<GetFieldMappingsResponse> getFieldMapping(HttpHeaders headers,
+                                                          GetFieldMappingsRequest getFieldMappingsRequest) {
+        return sendRequest(getFieldMappingsRequest, requestCreator.getFieldMapping(), GetFieldMappingsResponse.class,
+                           headers).next();
     }
 
     @Override
     public Mono<Boolean> updateAliases(HttpHeaders headers, IndicesAliasesRequest indicesAliasesRequest) {
-        return null;
+        return sendRequest(indicesAliasesRequest, requestCreator.updateAlias(), AcknowledgedResponse.class, headers)
+            .map(AcknowledgedResponse::isAcknowledged).next();
     }
 
     @Override
     public Mono<GetAliasesResponse> getAliases(HttpHeaders headers, GetAliasesRequest getAliasesRequest) {
-        return null;
+        return sendRequest(getAliasesRequest, requestCreator.getAlias(), GetAliasesResponse.class, headers).next();
     }
 
     @Override
     public Mono<Boolean> putTemplate(HttpHeaders headers, org.elasticsearch.client.indices.PutIndexTemplateRequest putIndexTemplateRequest) {
-        return null;
+        return sendRequest(putIndexTemplateRequest, requestCreator.putTemplate(), AcknowledgedResponse.class, headers)
+            .map(AcknowledgedResponse::isAcknowledged).next();
     }
 
     @Override
-    public Mono<org.elasticsearch.client.indices.GetIndexTemplatesResponse> getTemplate(HttpHeaders headers, org.elasticsearch.client.indices.GetIndexTemplatesRequest getIndexTemplatesRequest) {
-        return null;
+    public Mono<org.elasticsearch.client.indices.GetIndexTemplatesResponse> getTemplate(HttpHeaders headers,
+                                                                                        org.elasticsearch.client.indices.GetIndexTemplatesRequest getIndexTemplatesRequest) {
+        return (sendRequest(getIndexTemplatesRequest, requestCreator.getTemplates(), org.elasticsearch.client.indices.GetIndexTemplatesResponse.class,
+                            headers)).next();
     }
 
     @Override
     public Mono<Boolean> existsTemplate(HttpHeaders headers, IndexTemplatesExistRequest indexTemplatesExistRequest) {
-        return null;
+        return sendRequest(indexTemplatesExistRequest, requestCreator.templatesExist(),
+                           RawActionResponse.class, headers)
+            .flatMap(response -> response
+                .releaseBody()
+                .thenReturn(response.statusCode().is2xxSuccessful()))
+            .next();
     }
 
     @Override
     public Mono<Boolean> deleteTemplate(HttpHeaders headers, DeleteIndexTemplateRequest deleteIndexTemplateRequest) {
-        return null;
+        return sendRequest(deleteIndexTemplateRequest, requestCreator.deleteTemplate(), AcknowledgedResponse.class, headers)
+            .map(AcknowledgedResponse::isAcknowledged).next();
     }
 
     @Override
     public Mono<GetIndexResponse> getIndex(HttpHeaders headers, org.elasticsearch.client.indices.GetIndexRequest getIndexRequest) {
-        return null;
+        return sendRequest(getIndexRequest, requestCreator.getIndex(), GetIndexResponse.class, headers).next();
     }
 
     Request convertGetIndexTemplateRequest(GetIndexTemplatesRequest getIndexTemplatesRequest) {

+ 25 - 19
jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java

@@ -1,7 +1,6 @@
 package org.jetlinks.community.notify.email.embedded;
 
 import com.alibaba.fastjson.JSONObject;
-import io.vavr.control.Try;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -11,14 +10,17 @@ import org.hswebframework.web.id.IDGenerator;
 import org.hswebframework.web.utils.ExpressionUtils;
 import org.hswebframework.web.utils.TemplateParser;
 import org.hswebframework.web.validator.ValidatorUtils;
+import org.jetlinks.core.Values;
 import org.jetlinks.community.notify.*;
 import org.jetlinks.community.notify.email.EmailProvider;
 import org.jetlinks.community.notify.template.TemplateManager;
-import org.jetlinks.core.Values;
 import org.jsoup.Jsoup;
 import org.jsoup.nodes.Document;
 import org.jsoup.nodes.Element;
-import org.springframework.core.io.*;
+import org.springframework.core.io.ByteArrayResource;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.core.io.InputStreamSource;
+import org.springframework.core.io.Resource;
 import org.springframework.http.MediaType;
 import org.springframework.mail.javamail.JavaMailSender;
 import org.springframework.mail.javamail.JavaMailSenderImpl;
@@ -33,8 +35,6 @@ import reactor.core.scheduler.Schedulers;
 import javax.annotation.Nonnull;
 import javax.mail.internet.MimeMessage;
 import javax.mail.internet.MimeUtility;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
@@ -131,17 +131,24 @@ public class DefaultEmailNotifier extends AbstractNotifier<EmailTemplate> {
                 return Flux
                     .fromIterable(template.getAttachments().entrySet())
                     .flatMap(entry -> Mono.zip(Mono.just(entry.getKey()), convertResource(entry.getValue())))
-                    .doOnNext(tp -> Try
-                        .run(() -> helper.addAttachment(MimeUtility.encodeText(tp.getT1()), tp.getT2())).get())
-                    .then(Flux
-                              .fromIterable(template.getImages().entrySet())
-                              .flatMap(entry -> Mono.zip(Mono.just(entry.getKey()), convertResource(entry.getValue())))
-                              .doOnNext(tp -> Try
-                                  .run(() -> helper.addInline(tp.getT1(), tp.getT2(), MediaType.APPLICATION_OCTET_STREAM_VALUE))
-                                  .get())
-                              .then()
-                    ).thenReturn(mimeMessage)
-                    ;
+                    .flatMap(tp2 -> Mono
+                        .fromCallable(() -> {
+                            //添加附件
+                            helper.addAttachment(MimeUtility.encodeText(tp2.getT1()), tp2.getT2());
+                            return helper;
+                        }))
+                    .then(
+                        Flux.fromIterable(template.getImages().entrySet())
+                            .flatMap(entry -> Mono.zip(Mono.just(entry.getKey()), convertResource(entry.getValue())))
+                            .flatMap(tp2 -> Mono
+                                .fromCallable(() -> {
+                                    //添加图片资源
+                                    helper.addInline(tp2.getT1(), tp2.getT2(), MediaType.APPLICATION_OCTET_STREAM_VALUE);
+                                    return helper;
+                                }))
+                            .then()
+                    )
+                    .thenReturn(mimeMessage);
 
             })
             .flatMap(Function.identity())
@@ -159,8 +166,7 @@ public class DefaultEmailNotifier extends AbstractNotifier<EmailTemplate> {
                 .get()
                 .uri(resource)
                 .accept(MediaType.APPLICATION_OCTET_STREAM)
-                .exchange()
-                .flatMap(rep -> rep.bodyToMono(Resource.class));
+                .exchangeToMono(res->res.bodyToMono(Resource.class));
         } else if (resource.startsWith("data:") && resource.contains(";base64,")) {
             String base64 = resource.substring(resource.indexOf(";base64,") + 8);
             return Mono.just(
@@ -186,7 +192,7 @@ public class DefaultEmailNotifier extends AbstractNotifier<EmailTemplate> {
         if (StringUtils.isEmpty(subject) || StringUtils.isEmpty(text)) {
             throw new BusinessException("模板内容错误,text 或者 subject 不能为空.");
         }
-        String sendText = render(text, context, true);
+        String sendText = render(text, context,true);
         List<EmailTemplate.Attachment> tempAttachments = template.getAttachments();
         Map<String, String> attachments = new HashMap<>();