Quellcode durchsuchen

增加内嵌elasticsearch

zhouhao vor 5 Jahren
Ursprung
Commit
d5cdff3a9b

+ 16 - 0
jetlinks-components/elasticsearch-component/pom.xml

@@ -13,6 +13,22 @@
     <artifactId>elasticsearch-component</artifactId>
 
     <dependencies>
+
+        <dependency><!-- required by elasticsearch -->
+            <groupId>org.elasticsearch.plugin</groupId>
+            <artifactId>transport-netty4-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>io.projectreactor</groupId>
             <artifactId>reactor-core</artifactId>

+ 148 - 0
jetlinks-components/elasticsearch-component/src/main/java/org/elasticsearch/common/logging/Loggers.java

@@ -0,0 +1,148 @@
+package org.elasticsearch.common.logging;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Appender;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+
+import static org.elasticsearch.common.util.CollectionUtils.asArrayList;
+
+/**
+ * A set of utilities around Logging.
+ */
+public class Loggers {
+
+    public static final String SPACE = " ";
+
+    public static final Setting<Level> LOG_DEFAULT_LEVEL_SETTING =
+        new Setting<>("logger.level", Level.INFO.name(), Level::valueOf, Setting.Property.NodeScope);
+    public static final Setting.AffixSetting<Level> LOG_LEVEL_SETTING =
+        Setting.prefixKeySetting("logger.", (key) -> new Setting<>(key, Level.INFO.name(), Level::valueOf, Setting.Property.Dynamic,
+            Setting.Property.NodeScope));
+
+    public static Logger getLogger(Class<?> clazz, ShardId shardId, String... prefixes) {
+        return getLogger(clazz, shardId.getIndex(), asArrayList(Integer.toString(shardId.id()), prefixes).toArray(new String[0]));
+    }
+
+    /**
+     * Just like {@link #getLogger(Class, ShardId, String...)} but String loggerName instead of
+     * Class and no extra prefixes.
+     */
+    public static Logger getLogger(String loggerName, ShardId shardId) {
+        String prefix = formatPrefix(shardId.getIndexName(), Integer.toString(shardId.id()));
+        return new PrefixLogger(LogManager.getLogger(loggerName), prefix);
+    }
+
+    public static Logger getLogger(Class<?> clazz, Index index, String... prefixes) {
+        return getLogger(clazz, asArrayList(Loggers.SPACE, index.getName(), prefixes).toArray(new String[0]));
+    }
+
+    public static Logger getLogger(Class<?> clazz, String... prefixes) {
+        return new PrefixLogger(LogManager.getLogger(clazz), formatPrefix(prefixes));
+    }
+
+    public static Logger getLogger(Logger parentLogger, String s) {
+        Logger inner = LogManager.getLogger(parentLogger.getName() + s);
+        if (parentLogger instanceof PrefixLogger) {
+            return new PrefixLogger(inner, ((PrefixLogger)parentLogger).prefix());
+        }
+        return inner;
+    }
+
+    private static String formatPrefix(String... prefixes) {
+        String prefix = null;
+        if (prefixes != null && prefixes.length > 0) {
+            StringBuilder sb = new StringBuilder();
+            for (String prefixX : prefixes) {
+                if (prefixX != null) {
+                    if (prefixX.equals(SPACE)) {
+                        sb.append(" ");
+                    } else {
+                        sb.append("[").append(prefixX).append("]");
+                    }
+                }
+            }
+            if (sb.length() > 0) {
+                prefix = sb.toString();
+            }
+        }
+        return prefix;
+    }
+
+    /**
+     * Set the level of the logger. If the new level is null, the logger will inherit it's level from its nearest ancestor with a non-null
+     * level.
+     */
+    public static void setLevel(Logger logger, String level) {
+        final Level l;
+        if (level == null) {
+            l = null;
+        } else {
+            l = Level.valueOf(level);
+        }
+        setLevel(logger, l);
+    }
+
+    public static void setLevel(Logger logger, Level level) {
+//        if (!LogManager.ROOT_LOGGER_NAME.equals(logger.getName())) {
+//
+//            Configurator.setLevel(logger.getName(), level);
+//        } else {
+//
+//            final LoggerContext ctx = LoggerContext.getContext(false);
+//            final Configuration config = ctx.getConfiguration();
+//            final LoggerConfig loggerConfig = config.getLoggerConfig(logger.getName());
+//            loggerConfig.setLevel(level);
+//            ctx.updateLoggers();
+//        }
+//
+//        // we have to descend the hierarchy
+//        final LoggerContext ctx = LoggerContext.getContext(false);
+//        for (final LoggerConfig loggerConfig : ctx.getConfiguration().getLoggers().values()) {
+//            if (LogManager.ROOT_LOGGER_NAME.equals(logger.getName()) || loggerConfig.getName().startsWith(logger.getName() + ".")) {
+//                Configurator.setLevel(loggerConfig.getName(), level);
+//            }
+//        }
+    }
+
+    public static void addAppender(final Logger logger, final Appender appender) {
+//        final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
+//        final Configuration config = ctx.getConfiguration();
+//        config.addAppender(appender);
+//        LoggerConfig loggerConfig = config.getLoggerConfig(logger.getName());
+//        if (!logger.getName().equals(loggerConfig.getName())) {
+//            loggerConfig = new LoggerConfig(logger.getName(), logger.getLevel(), true);
+//            config.addLogger(logger.getName(), loggerConfig);
+//        }
+//        loggerConfig.addAppender(appender, null, null);
+//        ctx.updateLoggers();
+    }
+
+    public static void removeAppender(final Logger logger, final Appender appender) {
+//        final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
+//        final Configuration config = ctx.getConfiguration();
+//        LoggerConfig loggerConfig = config.getLoggerConfig(logger.getName());
+//        if (!logger.getName().equals(loggerConfig.getName())) {
+//            loggerConfig = new LoggerConfig(logger.getName(), logger.getLevel(), true);
+//            config.addLogger(logger.getName(), loggerConfig);
+//        }
+//        loggerConfig.removeAppender(appender.getName());
+//        ctx.updateLoggers();
+    }
+
+    public static Appender findAppender(final Logger logger, final Class<? extends Appender> clazz) {
+//        final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
+//        final Configuration config = ctx.getConfiguration();
+//        final LoggerConfig loggerConfig = config.getLoggerConfig(logger.getName());
+//        for (final Map.Entry<String, Appender> entry : loggerConfig.getAppenders().entrySet()) {
+//            if (entry.getValue().getClass().equals(clazz)) {
+//                return entry.getValue();
+//            }
+//        }
+        return null;
+    }
+
+}

+ 17 - 1
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java

@@ -1,9 +1,12 @@
 package org.jetlinks.community.elastic.search.configuration;
 
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.jetlinks.community.elastic.search.ElasticRestClient;
+import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearch;
+import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearchProperties;
 import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -12,21 +15,34 @@ import org.springframework.context.annotation.Configuration;
 
 /**
  * @author bsetfeng
+ * @author zhouhao
  * @since 1.0
  **/
 @Configuration
 @Slf4j
-@EnableConfigurationProperties({ElasticSearchProperties.class, ElasticSearchIndexProperties.class})
+@EnableConfigurationProperties({
+    ElasticSearchProperties.class,
+    EmbeddedElasticSearchProperties.class,
+    ElasticSearchIndexProperties.class})
 public class ElasticSearchConfiguration {
 
     @Autowired
     private ElasticSearchProperties properties;
 
+    @Autowired
+    private EmbeddedElasticSearchProperties embeddedElasticSearchProperties;
+
     @Bean
+    @SneakyThrows
     public ElasticRestClient elasticRestClient() {
+        if (embeddedElasticSearchProperties.isEnabled()) {
+            new EmbeddedElasticSearch(embeddedElasticSearchProperties)
+                .start();
+        }
         RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(properties.createHosts())
             .setRequestConfigCallback(properties::applyRequestConfigBuilder)
             .setHttpClientConfigCallback(properties::applyHttpAsyncClientBuilder));
         return new ElasticRestClient(client, client);
     }
+
 }

+ 46 - 0
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearch.java

@@ -0,0 +1,46 @@
+package org.jetlinks.community.elastic.search.embedded;
+
+import lombok.SneakyThrows;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.util.Collections;
+
+public class EmbeddedElasticSearch extends Node {
+
+    static {
+        System.setProperty("es.set.netty.runtime.available.processors","false");
+    }
+    @SneakyThrows
+    public EmbeddedElasticSearch(EmbeddedElasticSearchProperties properties) {
+        super(InternalSettingsPreparer.prepareEnvironment(
+            properties.applySetting(
+                Settings.builder()
+                    .put("node.name", "test")
+                    .put("discovery.type", "single-node")
+                    .put("transport.type", "netty4")
+                    .put("http.type", "netty4")
+                    .put("network.host", "0.0.0.0")
+                    .put("http.port", 9200)
+            ).build(), null),
+            Collections.singleton(Netty4Plugin.class), false);
+    }
+
+    @Override
+    protected void registerDerivedNodeNameWithLogger(String nodeName) {
+
+    }
+
+    @SneakyThrows
+    public void doStart() {
+        start();
+    }
+
+    @SneakyThrows
+    public void shutdown() {
+        close();
+    }
+
+}

+ 30 - 0
jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearchProperties.java

@@ -0,0 +1,30 @@
+package org.jetlinks.community.elastic.search.embedded;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.elasticsearch.common.settings.Settings;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = "elasticsearch.embedded")
+@Getter
+@Setter
+public class EmbeddedElasticSearchProperties {
+
+    private boolean enabled;
+
+    private String dataPath = "./data/elasticsearch";
+
+    private String homePath = "./";
+
+    private int port = 9200;
+
+    private String host = "0.0.0.0";
+
+
+    public Settings.Builder applySetting(Settings.Builder settings) {
+        return settings.put("network.host", host)
+            .put("http.port", 9200)
+            .put("path.data", dataPath)
+            .put("path.home", homePath);
+    }
+}

+ 6 - 0
jetlinks-standalone/src/main/resources/application.yml

@@ -33,6 +33,11 @@ easyorm:
   default-schema: public # 数据库默认的schema
   dialect: postgres #数据库方言
 elasticsearch:
+  embedded:
+    enabled: true # 为true时使用内嵌的elasticsearch,不建议在生产环境中使用
+    data-path: ./data/elasticsearch
+    port: 9200
+    host: 0.0.0.0
   client:
     host: localhost
     port: 9200
@@ -100,6 +105,7 @@ logging:
     org.jetlinks.rule.engine: warn
     org.jetlinks.gateway: debug
     org.springframework: warn
+    org.elasticsearch: error
 vertx:
   max-event-loop-execute-time-unit: seconds
   max-event-loop-execute-time: 30