Scala高级 一 高阶函数 scala 混合了面向对象和函数式的特性,在函数式编程语言中,函数是“头等公民”,它和Int、String、Class等其他类型处于同等的地位,可以像其他类型的变量一样被传递和操作。
高阶函数包含
1作为值得函数 在scala中,函数就像和数字、字符串一样,可以将函数传递给一个方法。我们可以对算法进行封装,然后将具体的动作传递给方法,这种特性很有用。
我们之前学习过List的map方法,它就可以接收一个函数,完成List的转换。
例子
示例说明
将一个整数列表中的每个元素转换为对应个数的小星星
1 List(1, 2, 3...) => *, **, ***
步骤
创建一个函数,用于将数字转换为指定个数的小星星
创建一个列表,调用map方法
打印转换为的列表
参考代码
1 2 3 4 5 6 7 8 9 10 11 package com.nicai.highlevelobject Demo01 { def main (args: Array [String ]): Unit = { val fun: Int => String = (num:Int ) => "*" * num val strings = (1 to 10 ).map(fun) println(strings) } }
2 匿名函数 定义
上面的代码,给(num:Int) => “*” * num函数赋值给了一个变量,但是这种写法有一些啰嗦。在scala中,可以不需要给函数赋值给变量,没有赋值给变量的函数就是*匿名函数
1 2 3 4 5 6 val list = List (1 , 2 , 3 , 4 )val func_num2star = (num:Int ) => "*" * numprint(list.map(func_num2star))
使用匿名函数优化上述代码
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 object Demo02 { def main (args: Array [String ]): Unit = { val strings = (1 to 10 ).map(x => "*" * x) println(strings) val strings2 = (1 to 10 ).map("*" * _) println(strings2) } }
3 柯里化 在scala和spark的源代码中,大量使用到了柯里化。为了后续方便阅读源代码,我们需要来了解下柯里化。
定义:
柯里化(Currying)是指将原先接受多个参数的方法转换为多个只有一个参数的参数列表的过程。
柯里化过程解析
例子
示例说明
编写一个方法,用来完成两个Int类型数字的计算
具体如何计算封装到函数中
使用柯里化来实现上述操作
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.nicai.highlevelobject Demo3 { def add (a:Int ,b:Int )(cala:(Int ,Int ) => Int )={ cala(a,b) } def main (args: Array [String ]): Unit = { println(add(1 , 2 )(_ + _)) println(add(1 , 2 )(_ * _)) } }
闭包 闭包其实就是一个函数,只不过这个函数的返回值依赖于声明在函数外部的变量。
可以简单认为,就是可以访问不在当前作用域范围的一个函数。
例子一
定义一个闭包
1 2 3 4 5 6 7 8 9 10 11 package com.nicai.highlevelobject Demo4 { var x=4 val add=(y:Int ) => x+y def main (args: Array [String ]): Unit = { println(add(5 )) } }
add函数就是一个闭包
例子二
柯里化就是一个闭包
1 2 3 def add (x:Int )(y:Int ) = { x + y }
上述代码相当于
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def add (x:Int ) = { (y:Int ) => x + y } 总的: package com.nicai.highlevelobject Demo5 { def add (a:Int )(b:Int ): Int ={ a+b } def add2 (x:Int )={ (y:Int ) => x+y } def main (args: Array [String ]): Unit = { println(add(5 )(6 )) println(add2(5 )(6 )) } }
二 隐式转换与隐式参数 隐式转换和隐式参数是scala非常有特色的功能,也是Java等其他编程语言没有的功能。我们可以很方便地利用隐式转换来丰富现有类的功能。后面在编写Akka并发编程、Spark SQL、Flink都会看到隐式转换和隐式参数的身影。
1 使用隐式转换 定义:
所谓隐式转换 ,是指以implicit关键字声明的带有单个参数 的方法。它是自动被调用 的,自动将某种类型转换为另外一种类型。
使用步骤
在object中定义隐式转换方法(使用implicit)
在需要用到隐式转换的地方,引入隐式转换(使用import)
自动调用隐式转化后的方法
例子
示例说明
使用隐式转换,让File具备有read功能——实现将文本中的内容以字符串形式读取出来
步骤
创建RichFile类,提供一个read方法,用于将文件内容读取为字符串
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 2. 定义一个隐式转换方法,将File隐式转换为RichFile对象 3. 创建一个File,导入隐式转换,调用File的read方法 **参考代码** ```scala package com.nicai.yinshizhuanhuan import java.io.File import scala.io.Source //隐式转换 /*1. 创建RichFile类,提供一个read方法,用于将文件内容读取为字符串 2. 定义一个隐式转换方法,将File隐式转换为RichFile对象 3. 创建一个File,导入隐式转换,调用File的read方法*/ object Demo6 { class RichFile(f:File){ def read()={ Source.fromFile(f).mkString } } object Im{ implicit def fileToRichFile(file:File) =new RichFile(file) } def main(args: Array[String]): Unit = { val file = new File("day23Scala4/data/a.txt") import Im.fileToRichFile println(file.read()) } }
**隐式转换的时机**
当对象调用类中不存在的方法或者成员时,编译器会自动将对象进行隐式转换
当方法中的参数的类型与目标类型不一致时
2 自动导入隐式转化方法 前面,我们手动使用了import来导入隐式转换。是否可以不手动import呢?
在scala中,如果在当前作用域中有隐式转换方法,会自动导入隐式转换。
示例:将隐式转换方法定义在main所在的object中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.nicai.yinshizhuanhuanimport java.io.File import scala.io.Source object Demo7 { class RichFile (f:File ) { def read ()={ Source .fromFile(f).mkString } } def main (args: Array [String ]): Unit = { val file = new File ("day23Scala4/data/a.txt" ) implicit def fileToRichFile (file:File ) =new RichFile (file) println(file.read()) } }
3 隐式参数 方法可以带有一个标记为implicit的参数列表。这种情况,编译器会查找缺省值,提供给该方法。
定义
在方法后面添加一个参数列表,参数使用implicit修饰
在object中定义implicit修饰的隐式值
调用方法,可以不传入implicit修饰的参数列表,编译器会自动查找缺省值
注意:
和隐式转换一样,可以使用import手动导入隐式参数
如果在当前作用域定义了隐式值,会自动进行导入
例子
示例说明
定义一个方法,可将传入的值,使用一个分隔符前缀、后缀包括起来
使用隐式参数定义分隔符
调用该方法,并打印测试
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.nicai.yinshizhuanhuanobject Demo8 { def qu (str:String )(implicit im:(String ,String )) ={ im._1+str+im._2 } object Im { implicit val delim= ("<<" ,">>" ) } def main (args: Array [String ]): Unit = { import Im .delim println(qu("aa" )) } }
三 Akka 并发编程 1 介绍 Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用的工具包。Akka是使用scala开发的库,同时可以使用scala和Java语言来开发基于Akka的应用程序。
2 特性
提供基于异步非阻塞、高性能的事件驱动编程模型
内置容错机制,允许Actor在出错时进行恢复或者重置操作
超级轻量级的事件处理(每GB堆内存几百万Actor)
使用Akka可以在单机上构建高并发程序,也可以在网络中构建分布式程序。
3 Akka通信过程 以下图片说明了Akka Actor的并发编程模型的基本流程:
学生创建一个ActorSystem
通过ActorSystem来创建一个ActorRef(老师的引用),并将消息发送给ActorRef
ActorRef将消息发送给Message Dispatcher(消息分发器)
Message Dispatcher将消息按照顺序保存到目标Actor的MailBox中
Message Dispatcher将MailBox放到一个线程中
MailBox按照顺序取出消息,最终将它递给TeacherActor接受的方法中
4 入门案例 基于Akka创建两个Actor,Actor之间可以互相发送消息。
实现步骤
创建Maven模块
创建并加载Actor
发送/接收消息
1创建Maven模块
使用Akka需要导入Akka库,我们这里使用Maven来管理项目
创建Maven模块
打开pom.xml文件,导入akka Maven依赖和插件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 <properties > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > <encoding > UTF-8</encoding > <scala.version > 2.11.8</scala.version > <scala.compat.version > 2.11</scala.compat.version > </properties > <dependencies > <dependency > <groupId > org.scala-lang</groupId > <artifactId > scala-library</artifactId > <version > ${scala.version}</version > </dependency > <dependency > <groupId > com.typesafe.akka</groupId > <artifactId > akka-actor_2.11</artifactId > <version > 2.3.14</version > </dependency > <dependency > <groupId > com.typesafe.akka</groupId > <artifactId > akka-remote_2.11</artifactId > <version > 2.3.14</version > </dependency > </dependencies > <build > <sourceDirectory > src/main/scala</sourceDirectory > <testSourceDirectory > src/test/scala</testSourceDirectory > <plugins > <plugin > <groupId > net.alchim31.maven</groupId > <artifactId > scala-maven-plugin</artifactId > <version > 3.2.2</version > <executions > <execution > <goals > <goal > compile</goal > <goal > testCompile</goal > </goals > <configuration > <args > <arg > -dependencyfile</arg > <arg > ${project.build.directory}/.scala_dependencies</arg > </args > </configuration > </execution > </executions > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-shade-plugin</artifactId > <version > 2.4.3</version > <executions > <execution > <phase > package</phase > <goals > <goal > shade</goal > </goals > <configuration > <filters > <filter > <artifact > *:*</artifact > <excludes > <exclude > META-INF/*.SF</exclude > <exclude > META-INF/*.DSA</exclude > <exclude > META-INF/*.RSA</exclude > </excludes > </filter > </filters > <transformers > <transformer implementation ="org.apache.maven.plugins.shade.resource.AppendingTransformer" > <resource > reference.conf</resource > </transformer > <transformer implementation ="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" > <mainClass > </mainClass > </transformer > </transformers > </configuration > </execution > </executions > </plugin > </plugins > </build >
2创建并加载Actor
创建两个Actor
SenderActor:用来发送消息
1 2 3 4 5 6 7 8 9 10 11 package com.nicai.akkademoimport akka.actor.Actor object SenderActor extends Actor { override def receive : Receive = { case x => println(x) } }
ReceiveActor:用来接收,回复消息
1 2 3 4 5 6 7 8 9 10 package com.nicai.akkademoimport akka.actor.Actor object ReceiveActor extends Actor { override def receive : Receive = { case x => println(x) } }
创建Actor
创建ActorSystem
创建自定义Actor
ActorSystem加载Actor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.nicai.akkademoimport akka.actor.{ActorSystem , Props }import com.typesafe.config.ConfigFactory object Entrance { def main (args: Array [String ]): Unit = { val actorSystem = ActorSystem ("actorSystem" , ConfigFactory .load()) val senderActor = actorSystem.actorOf(Props (SenderActor ),"senderActor" ) val receiveActor = actorSystem.actorOf(Props (ReceiveActor ),"receiveActor" ) } }
3发送/接收消息
使用样例类封装消息
SubmitTaskMessage——提交任务消息
SuccessSubmitTaskMessage——任务提交成功消息
使用类似于之前学习的Actor方式,使用!
发送异步消息
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 完整版 case class MessageSub (msg:String )//receive回复消息 case class MsgSuccess (msg:String )............................................................................ import akka .actor .Actor //发送消息 object SenderActor extends Actor { override def receive : Receive = { case MessageSub ("start" ) => { println("收到消息" ) val receiveActor = context.actorSelection("akka://actorSystem/user/receiveActor" ) receiveActor ! MessageSub ("nicai" ) } case MsgSuccess (name) =>{ println(name) } } } ............................................................................... import akka.actor.Actor object ReceiveActor extends Actor { override def receive : Receive = { case MessageSub (name) => { println(name) sender ! MsgSuccess ("我不猜" ) } case _ => println("未匹配的消息类型" ) } } ............................................................................ import akka.actor.{ActorSystem , Props }import com.typesafe.config.ConfigFactory object Entrance { def main (args: Array [String ]): Unit = { val actorSystem = ActorSystem ("actorSystem" , ConfigFactory .load()) val senderActor = actorSystem.actorOf(Props (SenderActor ),"senderActor" ) val receiveActor = actorSystem.actorOf(Props (ReceiveActor ),"receiveActor" ) senderActor ! MessageSub ("start" ) } }
程序输出:
5 Akka定时任务 如果我们想要使用Akka框架定时的执行一些任务,该如何处理呢?
使用方式:
Akka中,提供一个scheduler 对象来实现定时调度功能。使用ActorSystem.scheduler.schedule方法,可以启动一个定时任务。
schedule方法针对scala提供两种使用形式:
第一种:发送消息
1 2 3 4 5 6 def schedule ( initialDelay: FiniteDuration , interval: FiniteDuration , receiver: ActorRef , message: Any ) (implicit executor: ExecutionContext )
第二种:自定义实现
1 2 3 4 5 def schedule ( initialDelay: FiniteDuration , interval: FiniteDuration )(f: ⇒ Unit ) (implicit executor: ExecutionContext )
示例一 示例说明
定义一个Actor,每1秒发送一个消息给Actor,Actor收到后打印消息
使用发送消息方式实现
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import akka.actor.{Actor , ActorSystem , Props }import com.typesafe.config.ConfigFactory object Demo1 { object ReceiveActor extends Actor { override def receive : Receive = { case x => println(x) } } def main (args: Array [String ]): Unit = { val actorSystem = ActorSystem ("actorSystem" ,ConfigFactory .load()) val receiveActor = actorSystem.actorOf(Props (ReceiveActor )) import scala.concurrent.duration._ import actorSystem.dispatcher actorSystem.scheduler.schedule(0 seconds, 1 seconds, receiveActor, "hello" ) } }
示例二 示例说明
定义一个Actor,每1秒发送一个消息给Actor,Actor收到后打印消息
使用自定义方式实现
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import akka.actor.{Actor , ActorSystem , Props }import com.typesafe.config.ConfigFactory object Demo2 { object ReceiceActor extends Actor { override def receive : Receive = { case x => println(x) } } def main (args: Array [String ]): Unit = { val actorSystem = ActorSystem ("actorSystem" ,ConfigFactory .load()) val receiveActor = actorSystem.actorOf(Props (ReceiceActor )) import scala.concurrent.duration._ import actorSystem.dispatcher actorSystem.scheduler.schedule(0 seconds,1 seconds)( receiveActor ! " iac" ) } }
注意:
需要导入隐式转换import scala.concurrent.duration._
才能调用0 seconds方法
需要导入隐式参数import actorSystem.dispatcher
才能启动定时任务
6 实现两个进程间的通信 master实现 基于Akka实现在两个进程 间发送、接收消息。Worker启动后去连接Master,并发送消息,Master接收到消息后,再回复Worker消息。
1. Worker实现 步骤
创建一个Maven模块,导入依赖和配置文件
创建配置文件
1 2 3 4 5 application.conf akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.remote.netty.tcp.hostname = "127.0.0.1" akka.remote.netty.tcp.port = "8888"
创建启动WorkerActor
发送”setup”消息给WorkerActor,WorkerActor接收打印消息
启动测试
参考代码
Worker.scala
1 2 3 4 5 6 7 8 9 10 11 12 import akka.actor.{Actor , ActorSystem , Props }import com.typesafe.config.ConfigFactory object Worker { def main (args: Array [String ]): Unit = { val actorSystem = ActorSystem ("actorSystem" ,ConfigFactory .load()) val worker = actorSystem.actorOf(Props (WorkerActor )) worker ! "nicai" } }
WorkerActor.scala
1 2 3 4 5 6 7 8 import akka.actor.Actor object WorkerActor extends Actor { override def receive : Receive = { case x => println(x) } }
2. Master实现 步骤
创建Maven模块,导入依赖和配置文件
创建启动MasterActor
WorkerActor发送”connect”消息给MasterActor
MasterActor回复”success”消息给WorkerActor
WorkerActor接收并打印接收到的消息
启动Master、Worker测试
参考代码
Master.scala
1 2 3 4 5 6 7 8 9 10 import akka.actor.{ActorSystem , Props }import com.typesafe.config.ConfigFactory object Master { def main (args: Array [String ]): Unit = { val actorSystem = ActorSystem ("actorSystem" ,ConfigFactory .load()) val masterActor = actorSystem.actorOf(Props (MasterActor ),"masterActor" ) } }
MasterActor.scala
1 2 3 4 5 6 7 8 9 10 11 import akka.actor.Actor object MasterActor extends Actor { override def receive : Receive = { case "connect" => { println("worker连接成功" ) sender ! "success" } } }
WorkerActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import akka.actor.Actor object WorkerActor extends Actor { override def receive : Receive = { case "start" => { println("start" ) val masterActor = context.actorSelection("akka.tcp://actorSystem@127.0.0.1:9999/user/masterActor" ) masterActor ! "connect" } case "success" => { println("连接master成功" ) } } }
四 简易Spark通信框架案例 案例介绍
模拟Spark的Master与Worker通信
一个Master
若干个Worker(Worker可以按需添加)
实现思路
构建Master、Worker阶段
构建Master ActorSystem、Actor
构建Worker ActorSystem、Actor
Worker注册阶段
Worker进程向Master注册(将自己的ID、CPU核数、内存大小(M)发送给Master)
Worker定时发送心跳阶段
Master定时心跳检测阶段
Master定期检查Worker心跳,将一些超时的Worker移除,并对Worker按照内存进行倒序排序
多个Worker测试阶段
启动多个Worker,查看是否能够注册成功,并停止某个Worker查看是否能够正确移除
工程搭建
项目使用Maven搭建工程
分别搭建几下几个项目
工程名
说明
spark-demo-common
存放公共的消息、实体类
spark-demo-master
Akka Master节点
spark-demo-worker
Akka Worker节点
导入依赖
master/worker添加common依赖,其余同上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 common依赖 <properties > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > <encoding > UTF-8</encoding > <scala.version > 2.11.8</scala.version > <scala.compat.version > 2.11</scala.compat.version > </properties > <dependencies > <dependency > <groupId > org.scala-lang</groupId > <artifactId > scala-library</artifactId > <version > ${scala.version}</version > </dependency > </dependencies > <build > <sourceDirectory > src/main/scala</sourceDirectory > <testSourceDirectory > src/test/scala</testSourceDirectory > <plugins > <plugin > <groupId > net.alchim31.maven</groupId > <artifactId > scala-maven-plugin</artifactId > <version > 3.2.2</version > <executions > <execution > <goals > <goal > compile</goal > <goal > testCompile</goal > </goals > <configuration > <args > <arg > -dependencyfile</arg > <arg > ${project.build.directory}/.scala_dependencies</arg > </args > </configuration > </execution > </executions > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-shade-plugin</artifactId > <version > 2.4.3</version > <executions > <execution > <phase > package</phase > <goals > <goal > shade</goal > </goals > <configuration > <filters > <filter > <artifact > *:*</artifact > <excludes > <exclude > META-INF/*.SF</exclude > <exclude > META-INF/*.DSA</exclude > <exclude > META-INF/*.RSA</exclude > </excludes > </filter > </filters > <transformers > <transformer implementation ="org.apache.maven.plugins.shade.resource.AppendingTransformer" > <resource > reference.conf</resource > </transformer > <transformer implementation ="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" > <mainClass > </mainClass > </transformer > </transformers > </configuration > </execution > </executions > </plugins > </build >
导入配置文件(同上)
修改Master的端口为7000(或者自定义)
修改Worker的端口为7100(或者自定义 最好 8000以上)
构建master和worker
master和masterActor worker和workerActor
同上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import akka.actor.Actor object MasterActor extends Actor { override def receive : Receive = { case x => println(x) } } ........................................ import akka.actor.{ActorSystem , Props }import com.typesafe.config.ConfigFactory object MasterMain { def main (args: Array [String ]): Unit = { val actorSystem = ActorSystem ("actorSystem" ,ConfigFactory .load()) val masterActor = actorSystem.actorOf(Props (MasterActor ),"masterActor" ) } } ............................................................. import akka.actor.Actor object WorkerActor extends Actor { override def receive : Receive = { case x => println(x) } } .............................. import akka.actor.{ActorSystem , Props }import com.typesafe.config.ConfigFactory object WorkerMain { def main (args: Array [String ]): Unit = { val actorSystem = ActorSystem ("actorSystem" ,ConfigFactory .load()) val workerActor = actorSystem.actorOf(Props (WorkerActor ),"workerActor" ) } }
worker注册实现
在Worker启动时,发送注册消息给Master
步骤
Worker向Master发送注册消息(workerid、cpu核数、内存大小)
随机生成CPU核(1、2、3、4、6、8)
随机生成内存大小(512、1024、2048、4096)(单位M)
Master保存Worker信息,并给Worker回复注册成功消息
启动测试
参考代码
MasterActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import akka.actor.Actor import com.nicai.common.{MsgRegin , MsgSuccess , Pojo }import scala.collection.mutableobject MasterActor extends Actor { private var stringToPojo: mutable.Map [String , Pojo ] = collection.mutable.Map [String ,Pojo ]() override def receive : Receive = { case MsgRegin (workid,cpu,mem) => { println("收到注册消息" +workid+"-" +cpu+"-" +mem) stringToPojo += workid -> Pojo (workid,cpu,mem) sender ! MsgSuccess ("success" ) } } }
Pojo.scala
1 2 case class Pojo (wokid:String ,cpu:Int ,mem:Int )
MsgPackage.scala
1 2 3 4 case class MsgRegin (workid:String ,cpu:Int ,mem:Int )//注册成功回复消息 case class MsgSuccess (success:String )
WorkerActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import java.util.UUID import akka.actor.{Actor , ActorSelection , ActorSystem }import com.nicai.common.{MsgRegin , MsgSuccess }import scala.util.Random object WorkerActor extends Actor { private var actorSelection:ActorSelection =_ private var CPU_lIST :Int =_ private var MEM_LIST :Int =_ private var cpuList=List (1 ,2 ,4 ,8 ) private var memList=List (128 ,256 ,512 ,1024 ,2048 ) override def preStart ()={ actorSelection = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:7000/user/masterActor" ) val workerid:String =UUID .randomUUID().toString var a=new Random () CPU_lIST =cpuList(a.nextInt(cpuList.length)) MEM_LIST =memList(a.nextInt(memList.length)) val regin = MsgRegin (workerid,CPU_lIST ,MEM_LIST ) actorSelection ! regin } override def receive : Receive = { case MsgSuccess (name) => { println("注册后的回复" +name) } } }
worker定时发送心跳
Worker接收到Master返回注册成功后,发送心跳消息。而Master收到Worker发送的心跳消息后,需要更新对应Worker的最后心跳时间。
步骤
编写工具类读取心跳发送时间间隔
创建心跳消息
Worker接收到注册成功后,定时发送心跳消息
Master收到心跳消息,更新Worker最后心跳时间
启动测试
参考代码
修改配置文件:
1 2 3 添加 workerActor中的 //定时发送消息 worker.heartbeat.interval=5
在workerActor中ConfUtil.scala
1 2 3 4 5 6 7 8 import com.typesafe.config.{Config , ConfigFactory }object ConfUtil { private val config: Config = ConfigFactory .load() val `worker.heartbeat.interval` = config.getInt("worker.heartbeat.interval" ) }
MsgPackage.scala
1 2 case class MsgHeartBeat (workid:String ,cpu:Int ,mem:Int )
WorkerActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import java.util.UUID import akka.actor.{Actor , ActorSelection , ActorSystem }import com.nicai.common.{MsgHeartBeat , MsgRegin , MsgSuccess }import scala.util.Random object WorkerActor extends Actor { private var actorSelection: ActorSelection = _ private var workerid: String = _ private var CPU_lIST : Int = _ private var MEM_LIST : Int = _ private var cpuList = List (1 , 2 , 4 , 8 ) private var memList = List (128 , 256 , 512 , 1024 , 2048 ) override def preStart () = { actorSelection = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:7000/user/masterActor" ) workerid = UUID .randomUUID().toString var a = new Random () CPU_lIST = cpuList(a.nextInt(cpuList.length)) MEM_LIST = memList(a.nextInt(memList.length)) val regin = MsgRegin (workerid, CPU_lIST , MEM_LIST ) actorSelection ! regin } override def receive : Receive = { case MsgSuccess (name) => { println("注册后的回复" + name) import scala.concurrent.duration._ import context.dispatcher context.system.scheduler.schedule(0 seconds, ConfUtil .`worker.heartbeat.interval` seconds ) { actorSelection ! MsgHeartBeat (workerid,CPU_lIST ,MEM_LIST ) } } } }
MasterActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import java.util.Date import akka.actor.Actor import com.nicai.common.{MsgHeartBeat , MsgRegin , MsgSuccess , Pojo }import scala.collection.mutableobject MasterActor extends Actor { private var stringToPojo: mutable.Map [String , Pojo ] = collection.mutable.Map [String ,Pojo ]() override def receive : Receive = { case MsgRegin (workid,cpu,mem) => { println("收到注册消息" +workid+"-" +cpu+"-" +mem) stringToPojo += workid -> Pojo (workid,cpu,mem,new Date ().getTime) sender ! MsgSuccess ("success" ) } case MsgHeartBeat (workid,cpu,mem)=>{ println("接收到心跳" ) stringToPojo += workid -> Pojo (workid,cpu,mem,new Date ().getTime) println(stringToPojo) } } }
master定时心跳检测
如果某个worker超过一段时间没有发送心跳,Master需要将该worker从当前的Worker集合中移除。可以通过Akka的定时任务,来实现心跳超时检查。
步骤
编写工具类,读取检查心跳间隔时间间隔、超时时间
定时检查心跳,过滤出来大于超时时间的Worker
移除超时的Worker
对现有Worker按照内存进行降序排序,打印可用Worker
参考代码
1 2 3 4 5 6 修改 master的配置文件 //检查worker心跳的时间周期 master.heartbeat.check.interval=6 //配置worker的心跳超时时间 master.heartbeat.check.timeout=15
ConfigUtil.scala
1 2 3 4 5 6 7 8 9 10 import com.typesafe.config.{Config , ConfigFactory }object ConfigUtil { private val config: Config = ConfigFactory .load() val `master.heartbeat.check.interval` = config.getInt("master.heartbeat.check.interval" ) val `master.heartbeat.check.timeout` = config.getInt("master.heartbeat.check.timeout" ) }
MasterActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import java.util.Date import akka.actor.Actor import com.nicai.common.{MsgHeartBeat , MsgRegin , MsgSuccess , Pojo }import scala.collection.mutableobject MasterActor extends Actor { private var stringToPojo: mutable.Map [String , Pojo ] = collection.mutable.Map [String ,Pojo ]() override def preStart (): Unit = { import scala.concurrent.duration._ import context.dispatcher context.system.scheduler.schedule(0 seconds, ConfigUtil .`master.heartbeat.check.interval` seconds){ val timeoutWorkerMap = stringToPojo.filter { keyval => val lastHeartBeatTime = keyval._2.heartBeat if (new Date ().getTime - lastHeartBeatTime > ConfigUtil .`master.heartbeat.check.timeout` * 1000 ) { true } else { false } } if (!timeoutWorkerMap.isEmpty) { stringToPojo --= timeoutWorkerMap.map(_._1) val workerList = stringToPojo.map(_._2).toList val sortedWorkerList = workerList.sortBy(_.mem).reverse println("按照内存降序排序后的Worker列表:" ) println(sortedWorkerList) } } } override def receive : Receive = { case MsgRegin (workid,cpu,mem) => { println("收到注册消息" +workid+"-" +cpu+"-" +mem) stringToPojo += workid -> Pojo (workid,cpu,mem,new Date ().getTime) sender ! MsgSuccess ("success" ) } case MsgHeartBeat (workid,cpu,mem)=>{ println("接收到心跳" ) stringToPojo += workid -> Pojo (workid,cpu,mem,new Date ().getTime) println(stringToPojo) } } }
多个worker测试
修改配置文件,启动多个worker进行测试。
步骤
测试启动新的Worker是否能够注册成功 (修改worker的端口号即可)
停止Worker,测试是否能够从现有列表删除