1.什么是Akka Cluster?
Akka Cluster
将多个JVM连接整合在一起,实现消息地址的透明化和统一化使用管理,集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序,部署到很多JVM上去实现程序的分布式并行运算(单机也可以起很多节点构成集群)。更重要的是, Akka Cluster
集群构建与Actor编程没有直接的联系,集群构建是在ActorSystem层面上,实现了Actor消息地址的透明化,无需考虑目标运行环节是否分布式,可以按照正常的Actor编程模式进行开发。 我们知道,分布式集群是由若干节点组成的,那么节点的发现及状态管理是分布式系统一个比较重要的任务。Akka Cluster
中将节点的生命周期划分为:
joining - 当尝试加入集群时的初始状态
up - 加入集群后的正常状态
leaving / exiting - 节点退出集群时的中间状态
down - 集群无法感知某节点后,将其标记为down
removed - 从集群中被删除,以后也无法再加入集群
其实当参数akka.cluster.allow-weakly-up-members
启用时(默认是启用的),还有个weakly up
,它是用于集群出现分裂时,集群无法收敛,则leader无法将状态置为up的临时状态。这个后面再解释。 图中还有两个特殊的名词:
fd* - 这个表示akka的错误检测机制
Faiulre Detector
被触发后,将节点标记为unreachableunreachable* -
unreachable
不是一个真正的节点状态,更多的像是一个flag,用来描述集群无法与该节点进行通讯。当错误检测机制侦测到这个节点又能正常通讯时,会移除这个flag。
市面上大多数产品的分布式管理一般用的是注册中心机制,例如zk、consul或etcd。其实是节点把自己的信息注册到所使用的注册中心里,而master通过接受注册中心的通知得知新节点信息。显然本质上是一种master/slave的架构。这种架构有两个问题:
master节点一般是单一的,一旦挂了影响就比较大(所以很多master都采用了HA机制),也就是所谓的系统单点故障;
通常节点的地址发现是要走master去获取的,当系统并发大时,master节点就可能成为性能瓶颈,即单点性能瓶颈。
Akka
可能就是考虑这两点,采用了P2P的模式,这样任何一个节点都可以作为”master”,任何的节点都可以用来寻找其他节点地址。那它是怎么做到的呢?答案是Gossip协议和CRDT
。这里不做过多解释,感兴趣的话可以自己去翻阅相关介绍
2.代码工程
实验目的
搭建一个简单akka custer集群
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springboot-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>akka</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.13</artifactId> <version>2.6.0</version> </dependency> <!-- Akka Streams --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.13</artifactId> <version>2.6.0</version> </dependency> <!-- Akka Actor dependency --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_2.13</artifactId> <version>2.6.0</version> </dependency> <!-- Akka Remote dependency --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.13</artifactId> <version>2.6.0</version> </dependency> <!-- Akka Cluster dependency --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-typed_2.13</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
cluster
node1.conf
akka { actor { provider = "cluster" # 这里设置集群模式 } remote { artery { canonical.hostname = "127.0.0.1" canonical.port = 2551 # 对应节点的端口,node1 使用 2551,node2 使用 2552 } } cluster { seed-nodes = [ "akka://ClusterSystem@127.0.0.1:2551", "akka://ClusterSystem@127.0.0.1:2552" ] } }
node2.conf
akka { actor { provider = "cluster" } remote { artery { canonical.hostname = "127.0.0.1" canonical.port = 2552 # 第二个节点使用的端口 } } cluster { seed-nodes = [ "akka://ClusterSystem@127.0.0.1:2551", "akka://ClusterSystem@127.0.0.1:2552" ] } }
集群监听器
package com.et.akka.cluster; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.AbstractBehavior; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import akka.cluster.typed.Cluster; import akka.cluster.typed.Subscribe; import akka.cluster.ClusterEvent; public class ClusterListener extends AbstractBehavior<ClusterEvent.ClusterDomainEvent> { public ClusterListener(ActorContext<ClusterEvent.ClusterDomainEvent> context) { super(context); Cluster cluster = Cluster.get(context.getSystem()); cluster.subscriptions().tell(Subscribe.create(getContext().getSelf(), ClusterEvent.ClusterDomainEvent.class)); } @Override public Receive<ClusterEvent.ClusterDomainEvent> createReceive() { return newReceiveBuilder() .onMessage(ClusterEvent.MemberUp.class, this::onMemberUp) .onMessage(ClusterEvent.MemberRemoved.class, this::onMemberRemoved) .onAnyMessage(event -> { System.out.println("Received cluster event: " + event); return this; }) .build(); } private Behavior<ClusterEvent.ClusterDomainEvent> onMemberUp(ClusterEvent.MemberUp memberUp) { System.out.println("Member is Up: " + memberUp.member()); return this; } private Behavior<ClusterEvent.ClusterDomainEvent> onMemberRemoved(ClusterEvent.MemberRemoved memberRemoved) { System.out.println("Member is Removed: " + memberRemoved.member()); return this; } public static Behavior<ClusterEvent.ClusterDomainEvent> create() { return Behaviors.setup(ClusterListener::new); } }
启动集群
package com.et.akka.cluster; import akka.actor.typed.ActorSystem; import akka.cluster.ClusterEvent; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.io.File; public class ClusterApp { public static void main(String[] args) { // 读取外部配置文件 node1.conf 启动第一个节点 Config configNode1 = ConfigFactory.parseFile(new File("D:/IdeaProjects/ETFramework/akka/src/main/resources/node1.conf")) .withFallback(ConfigFactory.load()); ActorSystem<ClusterEvent.ClusterDomainEvent> systemNode1 = ActorSystem.create(ClusterListener.create(), "ClusterSystem", configNode1); System.out.println("Node 1 started with config from node1.conf"); // 读取外部配置文件 node2.conf 启动第二个节点 Config configNode2 = ConfigFactory.parseFile(new File("D:/IdeaProjects/ETFramework/akka/src/main/resources/node2.conf")) .withFallback(ConfigFactory.load()); ActorSystem<ClusterEvent.ClusterDomainEvent> systemNode2 = ActorSystem.create(ClusterListener.create(), "ClusterSystem", configNode2); System.out.println("Node 2 started with config from node2.conf"); } }
以上只是一些关键代码,所有代码请参见下面代码仓库
代码仓库
https://github.com/Harries/springboot-demo(akka)
3.测试
启动集群(执行ClusterApp里面的main方法),查看日志可以看到2个节点都起来了
还没有评论,来说两句吧...