|
@@ -55,4 +55,41 @@ class ScriptPayloadParserBuilderTest {
|
|
|
.verifyComplete();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ void testDirect() {
|
|
|
+ ScriptPayloadParserBuilder builder = new ScriptPayloadParserBuilder();
|
|
|
+ Map<String, Object> config = new HashMap<>();
|
|
|
+ config.put("script", "\n" +
|
|
|
+ "var cache = parser.newBuffer();\n" +
|
|
|
+ "parser.direct(function(buffer){\n" +
|
|
|
+ " cache.appendBuffer(buffer);\n" +
|
|
|
+ " if(cache.length()>=16){\n" +
|
|
|
+ " var result = cache;\n" +
|
|
|
+ " cache = parser.newBuffer(); \n" +
|
|
|
+ " parser.result(result)\n" +
|
|
|
+ " .complete(); \n" +
|
|
|
+ " }\n" +
|
|
|
+ " return null;\n" +
|
|
|
+ " });");
|
|
|
+ config.put("lang", "javascript");
|
|
|
+ System.out.println(config.get("script"));
|
|
|
+ PayloadParser parser = builder.build(ValueObject.of(config));
|
|
|
+
|
|
|
+ parser.handlePayload()
|
|
|
+ .doOnSubscribe(sb -> {
|
|
|
+ Mono.delay(Duration.ofMillis(100))
|
|
|
+ .subscribe(r -> {
|
|
|
+ parser.handle(Buffer.buffer(new byte[]{0, 1, 2, 3}));
|
|
|
+ parser.handle(Buffer.buffer(new byte[]{0, 1, 2, 3}));
|
|
|
+ parser.handle(Buffer.buffer(new byte[]{0, 1, 2, 3}));
|
|
|
+ parser.handle(Buffer.buffer(new byte[]{0, 1, 2, 3}));
|
|
|
+ });
|
|
|
+ })
|
|
|
+ .take(1)
|
|
|
+ .map(Buffer::length)
|
|
|
+ .as(StepVerifier::create)
|
|
|
+ .expectNext(16)
|
|
|
+ .verifyComplete();
|
|
|
+ }
|
|
|
+
|
|
|
}
|