1.什么是Hazelcast Jet?
Hazelcast Jet 允许您编写专注于数据转换的现代 Java 代码,同时它还负责让数据在节点集群中流动和运行计算的所有繁重工作。它支持处理有界(批处理)和无界(流式)数据。 Jet 很好地处理了以下问题:
纵向扩展和横向扩展:跨所有 CPU 核心和集群节点并行计算
自动重新缩放:扩展到新添加的节点并从离开或发生故障的节点恢复
正确性保证: 在节点故障的情况下至少进行一次和恰好进行一次处理
Jet 与许多流行的数据存储系统(如 Apache Kafka、Hadoop、关系数据库、消息队列等)集成。 Jet 支持多种数据转换,例如窗口聚合。例如,如果您的数据是来自数百万用户的 GPS 位置报告,Jet 只需使用滑动窗口和几行代码即可计算出每个用户的速度矢量。 Jet 还附带功能齐全的内存键值存储。使用它来缓存结果、存储参考数据或将其本身作为数据源。
2.代码工程
实验目标
实现单词数量统计job
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springboot-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>hazelcast-jet</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.hazelcast.jet.contrib</groupId> <artifactId>hazelcast-jet-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>
controller
package com.et.jet.controller; import com.et.jet.DemoApplication; import com.et.jet.example.WordCount; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.test.TestSources; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; @RestController public class HelloWorldController { @Autowired JetInstance instance; @RequestMapping("/hello") public Map<String, Object> showHelloWorld(){ Map<String, Object> map = new HashMap<>(); map.put("msg", "HelloWorld"); return map; } @RequestMapping("/submitJob") public void submitJob() { Pipeline pipeline = Pipeline.create(); pipeline.readFrom(TestSources.items("foo", "bar")) .writeTo(Sinks.logger()); JobConfig jobConfig = new JobConfig() .addClass(HelloWorldController.class); instance.newJob(pipeline, jobConfig).join(); } @Autowired WordCount wordCount; @RequestMapping("/wordCount") public void wordCount() { wordCount.go(); } }
WordCount
Pipeline
构成了 Jet 应用程序的基本结构。pipeline内的处理遵循以下步骤:
从源读取数据
转换数据
将数据写入接收器
本例子中展示:pipeline将从文件中读取,应用分组和聚合的转换,最后写入map。
package com.et.jet.example; import com.hazelcast.jet.Jet; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.Sources; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.*; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import static com.hazelcast.function.Functions.wholeItem; import static com.hazelcast.jet.Traversers.traverseArray; import static com.hazelcast.jet.aggregate.AggregateOperations.counting; import static java.util.Comparator.comparingLong; /** * Demonstrates a simple Word Count job in the Pipeline API. Inserts the * text of The Complete Works of William Shakespeare into a Hazelcast * IMap, then lets Jet count the words in it and write its findings to * another IMap. The example looks at Jet's output and prints the 100 most * frequent words. */ @Component public class WordCount { private static final String BOOK_LINES = "bookLines"; private static final String COUNTS = "counts"; String filepath ="D:/tmp/shakespeare-complete-works.txt"; @Autowired private JetInstance jet; private static Pipeline buildPipeline() { Pattern delimiter = Pattern.compile("\\W+"); Pipeline p = Pipeline.create(); p.readFrom(Sources.<Long, String>map(BOOK_LINES)) .flatMap(e -> traverseArray(delimiter.split(e.getValue().toLowerCase()))) .filter(word -> !word.isEmpty()) .groupingKey(wholeItem()) .aggregate(counting()) .writeTo(Sinks.map(COUNTS)); return p; } public static void main(String[] args) throws Exception { new WordCount().go(); } /** * This code illustrates a few more things about Jet, new in 0.5. See comments. */ public void go() { try { setup(); System.out.println("\nCounting words... "); long start = System.nanoTime(); Pipeline p = buildPipeline(); jet.newJob(p).join(); System.out.println("done in " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) + " milliseconds."); Map<String, Long> results = jet.getMap(COUNTS); printResults(results); } finally { Jet.shutdownAll(); } } private void setup() { //jet = Jet.bootstrappedInstance(); System.out.println("Loading The Complete Works of William Shakespeare"); try { long[] lineNum = {0}; Map<Long, String> bookLines = new HashMap<>(); InputStream stream = new FileInputStream(filepath); try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { reader.lines().forEach(line -> bookLines.put(++lineNum[0], line)); } jet.getMap(BOOK_LINES).putAll(bookLines); } catch (IOException e) { throw new RuntimeException(e); } } private static void printResults(Map<String, Long> counts) { final int limit = 100; StringBuilder sb = new StringBuilder(String.format(" Top %d entries are:%n", limit)); sb.append("/-------+---------\\\n"); sb.append("| Count | Word |\n"); sb.append("|-------+---------|\n"); counts.entrySet().stream() .sorted(comparingLong(Map.Entry<String, Long>::getValue).reversed()) .limit(limit) .forEach(e -> sb.append(String.format("|%6d | %-8s|%n", e.getValue(), e.getKey()))); sb.append("\\-------+---------/\n"); System.out.println(sb.toString()); } }
以上只是一些关键代码,所有代码请参见下面代码仓库
代码仓库
https://github.com/Harries/springboot-demo(Hazelcast Jet)
3.测试
启动Spring Boot应用
访问http://127.0.0.1:8088/wordCount
查看控制台输出日志
还没有评论,来说两句吧...