高羊茅四季常绿吗?

小说:高羊茅四季常绿吗?作者:纯戏更新时间:2019-04-24字数:83900

概述

  正如其它RPC或者RMI框架那样,Akka也提供了远程调用的能力。服务端在监听的端口上接收客户端的调用。本文将在《Spring与Akka的集成》一文的基础上介绍Akka的remote调用,本文很多代码和例子来源于Akka官网的代码示例,也包含了一些适用于Spring集成的改造,本文旨在介绍Akka的远程调用的开发过程。

服务端开发

配置

  Akka的默认配置文件为application.conf,如果不特别指明,Akka System都会默认加载此配置。如果你想自定义符合你习惯的名字,可以使用如下代码:

final ActorSystem system = ActorSystem.create("YourSystem", ConfigFactory.load("yourconf"));  

上述代码中的yourconf不包含文件后缀名,在你的资源路径下实际是yourconf.conf。

  我不太想自定义加载的配置文件,而是继续使用application.conf,这里先列出其配置:

include "common"  
  
akka {  
  # LISTEN on tcp port 2552  
  remote.netty.tcp.port = 2552  
}  

这里的remote.netty.tcp.port配置属性表示使用Netty框架在TCP层的监听端口是2552。include与java里的import或者jsp页面中的include标签的作用类似,表示引用其他配置文件中的配置。由于common.conf中包含了Akka的一些公共配置,所以可以这样引用,common.conf的配置如下:

akka {  
  
  actor {  
    provider = "akka.remote.RemoteActorRefProvider"  
  }  
  
  remote {  
    netty.tcp {  
      hostname = "127.0.0.1"  
    }  
  }  
  
}  

common配置中的provider属性表示Actor的引用提供者是akka.remote.RemoteActorRefProvider,即远程ActorRef的提供者。这里的hostname属性表示服务器的主机名。从common配置我们还可以看出Akka的配置有点类似于json,也是一种嵌套结构。此外,Akka还可以采用一种扁平的配置方式,例如:

akka.actor.provider = "..."  
akka.remote.netty.tcp.hostname = "127.0.0.1"  

它们所代表的作用是一样的。至于选择扁平还是嵌套的,一方面依据你的个人习惯,一方面依据配置的多寡——随着配置项的增多,你会发现嵌套会让你的配置文件更加清晰。

服务端

  由于官网的例子比较简洁并能说明问题,所以本文对Akka官网的例子进行了一些改造来介绍服务端与客户端之间的远程调用。服务端的配置已在上一小节列出,本小节着重介绍服务端的实现。

我们的服务端是一个简单的提供基本的加、减、乘、除的服务的CalculatorActor,这些运算都直接封装在CalculatorActor的实现中(在实际的业务场景中,Actor应该只接收、回复及调用具体的业务接口,这里的加减乘除运算应当由指定的Service接口实现,特别是在J2EE或者与Spring集成后),CalculatorActor的实现见代码清单1。

代码清单1

@Named("CalculatorActor")  
@Scope("prototype")  
public class CalculatorActor extends UntypedActor {  
      
    private static Logger logger = LoggerFactory.getLogger(CalculatorActor.class);  
  
    @Override  
    public void onReceive(Object message) {  
  
        if (message instanceof Op.Add) {  
            Op.Add add = (Op.Add) message;  
            logger.info("Calculating " + add.getN1() + " + " + add.getN2());  
            Op.AddResult result = new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Subtract) {  
            Op.Subtract subtract = (Op.Subtract) message;  
            logger.info("Calculating " + subtract.getN1() + " - " + subtract.getN2());  
            Op.SubtractResult result = new Op.SubtractResult(subtract.getN1(), subtract.getN2(),  
                    subtract.getN1() - subtract.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Multiply) {  
            Op.Multiply multiply = (Op.Multiply) message;  
            logger.info("Calculating " + multiply.getN1() + " * " + multiply.getN2());  
            Op.MultiplicationResult result = new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(),  
                    multiply.getN1() * multiply.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Divide) {  
            Op.Divide divide = (Op.Divide) message;  
            logger.info("Calculating " + divide.getN1() + " / " + divide.getN2());  
            Op.DivisionResult result = new Op.DivisionResult(divide.getN1(), divide.getN2(),  
                    divide.getN1() / divide.getN2());  
            getSender().tell(result, getSelf());  
  
        } else {  
            unhandled(message);  
        }  
    }  
}  

Add、Subtract、Multiply、Divide都继承自MathOp,这里只列出MathOp和Add的实现,见代码清单2所示。

代码清单2

public interface MathOp extends Serializable {  
}  
  
public static class Add implements MathOp {  
    private static final long serialVersionUID = 1L;  
    private final int n1;  
    private final int n2;  
  
    public Add(int n1, int n2) {  
        this.n1 = n1;  
        this.n2 = n2;  
    }  
  
    public int getN1() {  
        return n1;  
    }  
  
    public int getN2() {  
        return n2;  
    }  
} 

服务端应当启动CalculatorActor实例,以提供服务,代码如下:

logger.info("Start calculator");  
final ActorRef calculator = actorSystem.actorOf(springExt.props("CalculatorActor"), "calculator");  
actorMap.put("calculator", calculator);  
logger.info("Started calculator"); 

客户端

  客户端调用远程CalculatorActor提供的服务后,还要接收其回复信息,因此也需要监听端口。客户端和服务端如果在同一台机器节点上,那么客户端的监听端口不能与服务端冲突,我给出的配置示例如下:

include "common"  
  
akka {  
  remote.netty.tcp.port = 2553  
}  

客户端通过远程Actor的路径获得ActorSelection,然后向远程的Akka System获取远程CalculatorActor的ActorRef,进而通过此引用使用远端CalculatorActor提供的服务。在详细的说明实现细节之前,先来看看LookupActor的实现,见代码清单3所示。

代码清单3

@Named("LookupActor")  
@Scope("prototype")  
public class LookupActor extends UntypedActor {  
      
    private static Logger logger = LoggerFactory.getLogger(LookupActor.class);  
  
    private final String path;  
    private ActorRef calculator = null;  
  
    public LookupActor(String path) {  
        this.path = path;  
        sendIdentifyRequest();  
    }  
  
    private void sendIdentifyRequest() {  
        getContext().actorSelection(path).tell(new Identify(path), getSelf());  
        getContext().system().scheduler().scheduleOnce(Duration.create(3, SECONDS), getSelf(),  
                ReceiveTimeout.getInstance(), getContext().dispatcher(), getSelf());  
    }  
  
    @Override  
    public void onReceive(Object message) throws Exception {  
        if (message instanceof ActorIdentity) {  
            calculator = ((ActorIdentity) message).getRef();  
            if (calculator == null) {  
                logger.info("Remote actor not available: " + path);  
            } else {  
                getContext().watch(calculator);  
                getContext().become(active, true);  
            }  
  
        } else if (message instanceof ReceiveTimeout) {  
            sendIdentifyRequest();  
  
        } else {  
            logger.info("Not ready yet");  
  
        }  
    }  
  
    Procedure<Object> active = new Procedure<Object>() {  
        @Override  
        public void apply(Object message) {  
            if (message instanceof Op.MathOp) {  
                // send message to server actor  
                calculator.tell(message, getSelf());  
  
            } else if (message instanceof Op.AddResult) {  
                Op.AddResult result = (Op.AddResult) message;  
                logger.info(String.format("Add result: %d + %d = %d
", result.getN1(), result.getN2(), result.getResult()));  
                ActorRef sender = getSender();  
                logger.info("Sender is: " + sender);  
  
            } else if (message instanceof Op.SubtractResult) {  
                Op.SubtractResult result = (Op.SubtractResult) message;  
                logger.info(String.format("Sub result: %d - %d = %d
", result.getN1(), result.getN2(), result.getResult()));  
                ActorRef sender = getSender();  
                logger.info("Sender is: " + sender);  
                  
            } else if (message instanceof Terminated) {  
                logger.info("Calculator terminated");  
                sendIdentifyRequest();  
                getContext().unbecome();  
  
            } else if (message instanceof ReceiveTimeout) {  
                // ignore  
  
            } else {  
                unhandled(message);  
            }  
  
        }  
    };  
}

LookupActor的构造器需要传递远端CalculatorActor的路径,并且调用了sendIdentifyRequest方法,sendIdentifyRequest的作用有两个:

  1. 通过向ActorSelection向远端的Akka System发送Identify消息,并获取远程CalculatorActor的ActorRef;
  2. 启动定时调度,3秒后向CalculatorActor的执行上下文发送ReceiveTimeout消息,而LookupActor处理ReceiveTimeout消息时,再次调用了sendIdentifyRequest方法。
为何要循环调用sendIdentifyRequest方法呢?由于远端服务有可能因为进程奔溃、系统重启等原因导致已经获得的ActorRef过期或失效,因此需要一个监测机制。sendIdentifyRequest的循环调用就是一个简单的检测机制。
远端的Akka System在接收到Identify消息后,会给LookupActor回复ActorIdentity消息,LookupActor收到ActorIdentity消息后便可以解析出消息中载有的CalculatorActor的ActorRef,LookupActor然后调用getContext().watch(calculator)实现对子Actor的监管,一旦CalculatorActor重启或终止,LookupActor便可以接收到Terminated消息(有关Actor的监管机制,可以阅读官方文档)。
由于LookupActor的onReceive无法处理加、减、乘、除及Terminated消息,所以这里用到了一个Akka Actor的状态转换,通过使用getContext().become(active, true)。这里的active是一个内部类,其继承了Procedure并重写了apply方法,其中封装了对于对于加减乘除的计算以及结果、Terminated消息的处理。通过getContext().become(active, true),使得active接替onReceive方法处理接收到的消息。正如Akka官网所述——Actor的这一特性非常适合于开发实现FSM(有限状态自动机)。
active的功能主要分为三类:
  • 如果收到MathOp的消息,说明是加减乘除的消息,则将消息进一步告知远端的CalculatorActor并由其进行处理;
  • 如果收到AddResult或者SubtractResult,这说明CalculatorActor已经处理完了加或者减的处理,并回复了处理结果,因此对计算结果进行使用(本例只是简单的打印);
  • 如果收到了Terminated消息,说明远端的CalculatorActor停止或者重启了,因此需要重新调用sendIdentifyRequest获取最新的CalculatorActor的ActorRef。最后还需要取消active,恢复为默认接收消息的状态;
启动客户端的代码示例如下:
logger.info("start lookup");  
final String path = "akka.tcp://metadataAkkaSystem@127.0.0.1:2552/user/calculator";  
final ActorRef lookup = actorSystem.actorOf(springExt.props("LookupActor", path), "lookup");  
final Random r = new Random();  
actorSystem.scheduler().schedule(Duration.create(1, SECONDS), Duration.create(1, SECONDS), new Runnable() {  
  
    @Override  
    public void run() {  
        if (r.nextInt(100) % 2 == 0) {  
            lookup.tell(new Op.Add(r.nextInt(100), r.nextInt(100)), null);  
        } else {  
            lookup.tell(new Op.Subtract(r.nextInt(100), r.nextInt(100)), null);  
        }  
  
    }  
}, actorSystem.dispatcher());   
这里的客户端示例以1秒的周期,向LookupActor随机发送Add或者Subtract的消息。

Actor远端调用模型

  无论是本地Actor还是远端Actor,Actor之所以能够接收消息,是因为每个Actor都有其自身的邮箱,你可以定制自己的邮箱(可以用java中的各种队列)。本地应用如果想要调用远端的Actor服务并接收返回信息也就必须拥有自己的邮箱,否则邮递员投递信件时由于无法找到你家的邮箱,可能会打回邮件、放在你家的门缝下甚至丢弃。因此Actor的调用无论是本地的还是远端的都最好遵守Actor的编程模型,就像下图所示。
 

运行结果

  我的客户端和服务端都运行于本地,客户端tcp监听端口是2553,服务端监听端口是2552,由于本例子的代码较为健壮,所以客户端、服务端可以以任意顺序启动。客户端运行后的日志如下图所示:

 

服务端的运行日志如下图所示:

 

总结

  Akka的远端调用是大家在使用时最常用的特性之一,掌握起来不是什么难事,如何实现处理多种消息,并考虑其稳定性、健壮性是需要详细考虑的。

后记:经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
 
售卖链接如下:
京东:https://item.jd.com/12302500.html

当前文章:http://cnsdbtzg.com/yvlhb.html

发布时间:2019-04-24 01:22:58

5米的木香花多少钱一棵? 在新疆可以种植栾树吗? 贴梗海棠小苗多少钱一棵? 果岭草种子价格 桃金娘种子几月播种最好? 黑麦草种子需要浸泡吗? 凌霄花种子怎么播种? 花海有哪些品种? 四季开花边坡绿化草本植物有哪些? 绿化种植哪些草花?

43949 19437 81882 24407 53435 56297 71988 37395 31592 96349 87166 28970 29584 77345 41405 69923 13643 70619 74173 85348 88785 63009 95615

我要说两句: (0人参与)

发布