1.什么是Akka remoting?
Akka-Remoting
一种ActorSystem之间Actor对Actor点对点的沟通协议.通过Akka-Remoting来实现一个ActorSystem中的一个Actor与另一个ActorSystem中的另一个Actor之间的沟通
Akka Remoting限制:
不支持NAT(Network Address Translation)
不支持负载均衡器(Load Balancers)
Akka提供了种方式来使用Remoting功能:
通过调用actorSelection方法搜索一个actor,该方法输入的参数的模式为:akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
通过actorOf方法创建一个actor
下面看一下Remoting系统中故障恢复模型(Failure Recovery Model),如下图所示:
上图中,连接到一个远程系统的过程中,包括上面4种状态:在进行任何通信之前,系统处于Idle状态;当第一次一个消息尝试向远程系统发送,或者当远程系统连接过来,这时系统状态变为Active;当两个系统通信失败,连接丢失,这时系统变为Gated状态;当系统通信过程中,由于参与通信的系统的状态不一致导致系统无法恢复,这时远程系统变为Quarantined状态,只有重新启动系统才能恢复,重启后系统变为Active状态。
Akka Remoting的功能
Akka
将remoting
完全配置化了,使用时几乎只需要修改配置文件,除非自定义,否则不需要动一行代码。 remoting
包提供了两个功能:
查找一个已存在的远程Actor
在指定的远程路径上创建一个远程Actor
2.代码管理
实验目标
实现客户端和服务端双向通信
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springboot-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>akka</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.13</artifactId> <version>2.6.0</version> </dependency> <!-- Akka Streams --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.13</artifactId> <version>2.6.0</version> </dependency> <!-- Akka Actor dependency --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_2.13</artifactId> <version>2.6.0</version> </dependency> <!-- Akka Remote dependency --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.13</artifactId> <version>2.6.0</version> </dependency> <!-- Akka Cluster dependency --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-typed_2.13</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
server
server.conf(放在src/main/resources目录下)
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { artery { enabled = on transport = tcp canonical.hostname = "127.0.0.1" canonical.port = 2552 } } }
package com.et.akka.remoting; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import com.typesafe.config.ConfigFactory; public class ServerApp { static class ServerActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match(String.class, msg -> { System.out.println("Server received message: " + msg); getSender().tell("Hello from Server", getSelf()); }) .build(); } } public static void main(String[] args) { ActorSystem system = ActorSystem.create("RemoteSystem", ConfigFactory.load("server")); ActorRef serverActor = system.actorOf(Props.create(ServerActor.class), "serverActor"); System.out.println("Server is running..."); } }
client
client.conf(放在src/main/resources目录下)
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { artery { enabled = on transport = tcp canonical.hostname = "127.0.0.1" canonical.port = 0 } } }
package com.et.akka.remoting; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import com.typesafe.config.ConfigFactory; public class ClientApp { static class ClientActor extends AbstractActor { private final String serverActorPath; public ClientActor(String serverActorPath) { this.serverActorPath = serverActorPath; } @Override public void preStart() { ActorSelection serverActor = getContext().actorSelection(serverActorPath); serverActor.tell("Hello from Client", getSelf()); } @Override public Receive createReceive() { return receiveBuilder() .match(String.class, msg -> { System.out.println("Client received message: " + msg); }) .build(); } } public static void main(String[] args) { ActorSystem system = ActorSystem.create("RemoteSystem", ConfigFactory.load("client")); String serverPath = "akka://RemoteSystem@127.0.0.1:2552/user/serverActor"; ActorRef clientActor = system.actorOf(Props.create(ClientActor.class, serverPath), "clientActor"); System.out.println("Client is running..."); } }
以上只是一些关键代码,所有代码请参见下面代码仓库
代码仓库
https://github.com/Harries/springboot-demo(akka)
3.测试
启动server
启动client
产看日志
server
Server is running... Server received message: Hello from Client
client
Client is running... Client received message: Hello from Serve
还没有评论,来说两句吧...