#一 样例类
样例类是一种特殊类,它可以用来快速定义一个用于保存数据 的类(类似于Java POJO类),而且它会自动生成apply方法,允许我们快速地创建样例类实例对象。后面,在并发编程和spark、flink这些框架也都会经常使用它。
样例类可以使用**类名(参数1, 参数2…)**快速创建实例对象
定义样例类成员变量时,可以指定var类型,表示可变。默认是不可变的 val 可省略
样例类自动生成了toString、equals、hashCode、copy方法
样例对象没有主构造器,可以使用样例对象来创建枚举、或者标识一类没有任何数据的消息
语法:
1 case class 样例类名 (成员变量名1:类型1, 成员变量名2:类型2, 成员变量名3:类型3 )
定义样例类
1 2 3 4 5 6 7 8 9 10 object Demo1 { case class Per (name:String , var age:Int ) def main (args: Array [String ] ) : Unit = { val per = Per ("你猜" ,25 ) per.age=25 println(per) } }
样例类的方法 定义样例类编译器自动帮我们实现了一下几个方法
1 2 3 4 5 apply 快速的用类名创建对象 toString 与java同 equals 比较两个样例类的成员变量是否相等 与==类似 hashCode 如两个样例类的所有的成员变量的值相等 则hash值相等 否则 只要一个不同 则hash值就不等 copy 样例类的克隆
1 2 3 4 5 6 7 8 9 10 object Demo2 { case class Per (name:String ,age:Int ) def main (args: Array [String ] ) : Unit = { val nicai = Per ("nicai" ,55 ) val nn = nicai.copy("nn" ) println(nn) } }
样例对象 使用case object可以创建样例对象。样例对象是单例的,而且它没有主构造器 。样例对象是可序列化的。格式:
它主要用在两个地方:
定义枚举
作为没有任何参数的消息传递(后面Akka编程会讲到)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 object Demo3 { trait Sex case object Man extends Sex case object Wonmn extends Sex case class Per (name:String ,sex: Sex ) def main (args: Array [String ] ) : Unit = { val per = Per ("小明" ,Man ) val p = Per ("小红" ,Wonmn ) println(per) println(p) } }
定义消息
1 2 3 4 5 case class StartSpeakingMessage (textToSpeak: String )// 消息如果没有任何参数,就可以定义为样例对象 case object StopSpeakingMessage case object PauseSpeakingMessage case object ResumeSpeakingMessage
二模板匹配 scala中有一个非常强大的模式匹配机制,可以应用在很多场景:
简单模式匹配 相当于java中的switch语句
语法
1 2 3 4 5 6 变量 match { case "常量1" => 表达式1 case "常量2" => 表达式2 case "常量3" => 表达式3 case _ => 表达式4 }
1 2 3 4 5 6 7 8 9 10 11 12 13 object Demo1 { def main (args: Array [String ]): Unit = { val str = StdIn .readLine() val unit = str match { case "hadoop" => "nicai" case "spaker" => "分布式计算框架" case _ => "未匹配" } println(unit) } }
匹配类型 根据不同的数据类型进行匹配
1 2 3 4 5 6 7 变量 match { case 类型1 变量名: 类型1 => 表达式1 case 类型2 变量名: 类型2 => 表达式2 case 类型3 变量名: 类型3 => 表达式3 ... case _ => 表达式4 }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 object Demo2 { var a:Any ="hadoop" def main (args: Array [String ]): Unit = { val unit = a match { case x: String => s"${x} 字符串" case x: Int => "整形" case x: Double => "浮点型" case _ => "没匹配" } println(unit) } }
守卫 在Java中,只能简单地添加多个case标签,例如:要匹配0-7,就需要写出来8个case语句。例如:
1 2 3 4 5 6 7 8 9 10 11 12 int a = 0 ; switch(a) { case 0 : a += 1 ; case 1 : a += 1 ; case 2 : a += 1 ; case 3 : a += 1 ; case 4 : a += 2 ; case 5 : a += 2 ; case 6 : a += 2 ; case 7 : a += 2 ; default : a = 0 ; }
在scala中,可以使用守卫来简化上述代码——也就是在case语句中添加if条件判断 。
1 2 3 4 5 6 7 8 9 10 11 12 13 object Demo3 { private val i: Int = StdIn .readInt() def main (args: Array [String ]): Unit = { val unit = i match { case x if x > 0 && x < 3 => "0-3" case x if x > 3 && x < 10 => println("3-10" ) case x if x > 10 && x < 13 => println("10-13" ) case _ => println("weipi" ) } println(unit) } }
匹配样例类 scala可以使用模式匹配来匹配样例类,从而可以快速获取样例类中的成员数据。后续,我们在开发Akka案例时,还会用到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 object Demo4 { case class Per (name:String ,age:Int ) case class Stu (name:String ,age:Int ) def main (args: Array [String ] ) : Unit = { val ni:Any = Per ("ni" ,55 ) val unit = ni match { case Per (name, age) => s"${name} :${age} per" case Stu (name, age) => s"${name} :${age} stu" case _ => "未匹配" } println(unit) } }
匹配集合 1 匹配数组
依次修改代码定义以下三个数组
1 2 3 Array (1 ,x,y) Array (0 ) Array (0 , ...)
使用模式匹配上述数组
参考代码
1 2 3 4 5 6 7 val arr = Array (1 , 3 , 5 )arr match { case Array (1 , x, y) => println(x + " " + y) case Array (0 ) => println("only 0" ) case Array (0 , _*) => println("0 ..." ) case _ => println("something else" ) }
2匹配列表
依次修改代码定义以下三个列表
1 2 3 List (0 ) List (0 ,...) List (x,y)
使用模式匹配上述列表
参考代码
1 2 3 4 5 6 7 8 val list = List (0 , 1 , 2 )list match { case 0 :: Nil => println("只有0的列表" ) case 0 :: tail => println("0开头的列表" ) case x :: y :: Nil => println(s"只有另两个元素${x} , ${y} 的列表" ) case _ => println("未匹配" ) }
3 匹配元组
依次修改代码定义以下两个元组
使用模式匹配上述元素
参考代码
1 2 3 4 5 6 7 val tuple = (2 , 2 , 5 )tuple match { case (1 , x, y) => println(s"三个元素,1开头的元组:1, ${x} , ${y} " ) case (x, y, 5 ) => println(s"三个元素,5结尾的元组:${x} , ${y} , 5" ) case _ => println("未匹配" ) }
变量声明中的模式匹配 在定义变量的时候,可以使用模式匹配快速获取数据。
1 获取数组中的元素
生成包含0-10数字的数组,使用模式匹配分别获取第二个、第三个、第四个元素
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 package com.nicai.demo.matchdemoobject Demo8 { def main (args: Array [String ]): Unit = { var array=(0 to 10 ).toArray var Array (_,x,y,z,_*)=array println(x) println(y) println(z) } }
2 获取列表中的数据
生成包含0-10数字的列表,使用模式匹配分别获取第一个、第二个元素
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 package com.nicai.demo.matchdemoobject Demo9 { def main (args: Array [String ]): Unit = { var a= (0 to 10 ).toList var x :: y :: tail =a println(x) println(y) } }
option类型 scala中,Option类型来表示可选值。这种类型的数据有两种形式:
Some(x):表示实际的值
None:表示没有值
使用Option类型,可以用来有效避免空引用(null)异常。也就是说,将来我们返回某些数据时,可以返回一个Option类型来替代。
getOrElse方法
使用getOrElse方法,当Option对应的实例是None时,可以指定一个默认值,从而避免空指针异常
scala鼓励使用Option类型来封装数据,可以有效减少,在代码中判断某个值是否为null
可以使用getOrElse方法来针对None返回一个默认值
例子一
定义一个两个数相除的方法,使用Option类型来封装结果
然后使用模式匹配来打印结果
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.nicai.demo.matchdemo object Demo10 { def div (a:Double ,b:Int ): Option [Double ] ={ if (b != 0 ){ Some (a/b) }else { None } } def main (args: Array [String ]): Unit = { val option = div(15.2 ,2 ) val unit = option match { case Some (x) => x case None => "除数不可为0" } println(unit) } }
例子二
重写上述案例,使用getOrElse方法,当除零时,或者默认值为0
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.nicai.demo.matchdemoobject Demo11 { def div (a: Double , b: Int ):Option [Double ]= { if ( b != 0 ){ Some (a/b) }else { None } } def main (args: Array [String ]): Unit = { val d = div(15.6 ,0 ).getOrElse(0 ) println(d) } }
偏函数 偏函数可以提供了简洁的语法,可以简化函数的定义。配合集合的函数式编程,可以让代码更加优雅。
定义
偏函数被包在花括号内没有match的一组case语句是一个偏函数
偏函数是PartialFunction[A, B]的一个实例
可以理解为:偏函数是一个参数和一个返回值的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.nicai.demo.PartialFunctionDemo object Demo12 { private val value: PartialFunction [Int , String ] = { case 1 => "一" case 2 => "二" case _ => "其他" } def main (args: Array [String ]): Unit = { println(value(1 )) } }
定义一个列表,包含1-10的数字
请将1-3的数字都转换为[1-3]
请将4-8的数字都转换为[4-8]
将其他的数字转换为(8-星]
参考代码
1 2 3 4 5 6 7 8 9 val list = (1 to 10 ).toListval list2 = list.map{ case x if x >= 1 && x <= 3 => "[1-3]" case x if x >= 4 && x <= 8 => "[4-8]" case x if x > 8 => "(8-*]" } println(list2)
正则表达式 在scala中,可以很方便地使用正则表达式来匹配数据。
scala中提供了Regex类来定义正则表达式,要构造一个RegEx对象,直接使用String类的r方法即可。
建议使用三个双引号来表示正则表达式,不然就得对正则中的反斜杠来进行转义。
1 val regEx = "" "正则表达式" "" .r
findAllMatchIn方法
使用findAllMatchIn方法可以获取所有正则匹配到的字符串
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 示例说明 定义一个正则表达式,来匹配邮箱是否合法 合法邮箱测试:qq12344@163. com 不合法邮箱测试:qq12344@.com val r = "" ".+@.+\..+" "" .rval eml1 = "qq12344@163.com" val eml2 = "qq12344@.com" if (r.findAllMatchIn(eml1).size > 0 ) { println(eml1 + "邮箱合法" ) } else { println(eml1 + "邮箱不合法" ) } if (r.findAllMatchIn(eml2).size > 0 ) { println(eml2 + "邮箱合法" ) } else { println(eml2 + "邮箱不合法" ) }
找出以下列表中的所有不合法的邮箱
1 "38123845@qq.com", "a1da88123f@gmail.com", "zhansan@163.com", "123afadff.com"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.nicai.demo.zhengzebiaodashiobject Demo15 { def main (args: Array [String ]): Unit = { var a= List ("38123845@qq.com" , "a1da88123f@gmail.com" , "zhansan@163.com" , "123afadff.com" ) val r="" ".+@.+\.com" "" .r val strings = a.filter { case x if r.findAllMatchIn(x).size == 0 => true case _ => false } println(strings) } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package com.nicai.demo.zhengzebiaodashiobject Demo16 { def main (args: Array [String ]): Unit = { val re ="" ".+@(.+)\.com" "" .r var li=List ("38123845@qq.com" , "a1da88123f@gmail.com" , "zhansan@163.com" , "123afadff.com" ) val strings = li.map { case x@re (company) => s"${x} -> ${company} " case x => s"${x} + 未知" } println(strings) } }
异常处理 来看看下面一段代码。
1 2 3 4 5 6 7 8 9 def main (args: Array [String ]): Unit = { val i = 10 / 0 println("你好!" ) } Exception in thread "main" java.lang.ArithmeticException : / by zero at ForDemo $.main(ForDemo .scala:3 ) at ForDemo .main(ForDemo .scala)
执行程序,可以看到scala抛出了异常,而且没有打印出来”你好”。说明程序出现错误后就终止了。
那怎么解决该问题呢?
在scala中,可以使用异常处理来解决这个问题
捕获异常 语法格式
1 2 3 4 5 6 7 8 9 10 try { } catch { case ex:异常类型1 => case ex:异常类型2 => } finally { }
try中的代码是我们编写的业务处理代码
在catch中表示当出现某个异常时,需要执行的代码
在finally中,是不管是否出现异常都会执行的代码
示例 示例说明
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 package com.nicai.demo.exceptionDemoobject Demo17 { def main (args: Array [String ]): Unit = { try { var a= 4 /0 }catch { case ex:Exception => println(ex.getMessage) } } }
###抛出异常
我们也可以在一个方法中,抛出异常。语法格式和Java类似,使用throw new Exception...
例子:
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 package com.nicai.demo.exceptionDemo object Demo18 { def main (args: Array [String ]): Unit = { throw new Exception ("这是一个异常" ) } } Exception in thread "main" java.lang.Exception : 这是一个异常 at ForDemo $.main(ForDemo .scala:3 ) at ForDemo .main(ForDemo .scala)
scala不需要在方法上声明要抛出的异常,它已经解决了再Java中被认为是设计失败的检查型异常。
下面是Java代码
1 2 3 public static void main (String[] args) throws Exception { throw new Exception("这是一个异常" ); }
提取器 我们之前已经使用过scala中非常强大的模式匹配功能了,通过模式匹配,我们可以快速匹配样例类中的成员变量.
那是不是所有的类都可以进行这样的模式匹配呢?答案是:
不可以
的。要支持模式匹配,必须要实现一个提取器 。
样例类自动实现了apply、unapply方法
定义提取器 之前我们学习过了,实现一个类的伴生对象中的apply方法,可以用类名来快速构建一个对象。伴生对象中,还有一个unapply方法。与apply相反,unapply是将该类的对象,拆解为一个个的元素。
要实现一个类的提取器,只需要在该类的伴生对象中实现一个unapply方法即可
语法格式
1 2 3 4 5 6 7 8 def unapply (stu:Student ):Option [(类型1 , 类型2 , 类型3. ..)] = { if (stu != null ) { Some ((变量1 , 变量2 , 变量3. ..)) } else { None } }
示例说明
创建一个Student类,包含姓名年龄两个字段
实现一个类的解构器,并使用match表达式进行模式匹配,提取类中的字段。
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.nicai.demo.tiquqiobject Demo19 { class Stu (var name:String ,var age:Int ) object Stu { def apply (name: String , age: Int ): Stu = new Stu (name, age) def unapply (stu :Stu ) = { val tuple =(stu.name,stu.age) Some (tuple) } } def main (args: Array [String ]): Unit = { val nicai = Stu ("nicai" ,55 ) val unit = nicai match { case Stu (name, age) => s"${name} :${age} " } println(unit) } }
泛型 scala和Java一样,类和特质、方法都可以支持泛型。我们在学习集合的时候,一般都会涉及到泛型。
定义一个泛型方法 在scala中,使用方括号来定义类型参数。
语法格式
1 2 3 def 方法名 [泛型名称](..) = { }
示例说明
用一个方法来获取任意类型数组的中间的元素
不考虑泛型直接实现(基于Array[Int]实现)
加入泛型支持
参考代码
不考虑泛型的实现
1 2 3 4 5 6 7 def getMiddle (arr:Array [Int ]) = arr(arr.length / 2 )def main (args: Array [String ]): Unit = { val arr1 = Array (1 ,2 ,3 ,4 ,5 ) println(getMiddle(arr1)) }
加入泛型支持
1 2 3 4 5 6 7 8 9 10 11 package com.nicai.demo.fanxingobject Demo20 { def getMid [T ](array: Array [T ])= array(array.length/2 ) def main (args: Array [String ]): Unit = { println(getMid(Array (1 , 2 , 3 ))) println(getMid(Array ("dd" , "uu" , "sss" ))) } }
##泛型类
scala的类也可以定义泛型。接下来,我们来学习如何定义scala的泛型类
定义 语法格式
定义一个泛型类,直接在类名后面加上方括号,指定要使用的泛型参数
指定类对应的泛型参数后,就使用这些类型参数来定义变量了
示例 参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.nicai.demo.fanxingobject Demo21 { case class Per [y] (name:y,age:y ) def main (args: Array [String ] ) : Unit = { val list = List ( Per ("NJJS" , 45 ), Per ("jsjj" , 789 ), Per (56456 , "SSS" ) ) println(list) } }
上下界 需求:
我们在定义方法/类的泛型时,限定必须从哪个类继承、或者必须是哪个类的父类。此时,就需要使用到上下界。
上界定义:
使用<: 类型名
表示给类型添加一个上界 ,表示泛型参数必须要从该类(或本身)继承
语法格式
示例说明
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package com.nicai.demo.fanxingobject Demo22 { class Per class Stu extends Per class Man extends Stu def m [t <: Stu ](a:Array [t] ) = println(a) def main (args: Array [String ]): Unit = { m(Array (new Stu )) } }
下界
上界是要求必须是某个类的子类,或者必须从某个类继承,而下界是必须是某个类的父类 (或本身)
语法格式
注意:
如果类既有上界、又有下界。下界写在前面,上界写在后面 (同时又上下界,可能会守不住,即范围之外的也可以)
示例说明
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.nicai.demo.fanxingobject Demo23 { class Per class Stu extends Per class Man extends Stu def m [T >: Stu ](a:Array [T ] ) = println(a) def main (args: Array [String ]): Unit = { m(Array (new Stu )) m(Array (new Per )) } }
协变 逆变 非变 spark的源代码中大量使用到了协变、逆变、非变,学习该知识点对我们将来阅读spark源代码很有帮助。
来看一个类型转换的问题:
1 2 3 4 5 6 7 8 9 10 11 class Pair [T ]object Pair { def main (args: Array [String ]): Unit = { val p1 = Pair ("hello" ) val p2:Pair [AnyRef ] = p1 println(p2) } }
如何让带有泛型的类支持类型转换呢?
非变
语法格式
默认泛型类是非变的
类型B是A的子类型,Pair[A]和Pair[B]没有任何从属关系
Java是一样的
协变 语法格式
类型B是A的子类型,Pair[B]可以认为是Pair[A]的子类型
参数化类型的方向和类型的方向是一致的。
逆变 语法格式
类型B是A的子类型,Pair[A]反过来可以认为是Pair[B]的子类型
参数化类型的方向和类型的方向是相反的
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class Super class Sub extends Super class Temp1 [T ]class Temp2 [+T ]class Temp3 [-T ]def main (args: Array [String ] ) : Unit = { val a:Temp1 [Sub ] = new Temp1 [Sub ] val c: Temp2 [Sub ] = new Temp2 [Sub ] val d: Temp2 [Super ] = c val e: Temp3 [Super ] = new Temp3 [Super ] val f: Temp3 [Sub ] = e }
Actor并发编程 scala的Actor并发编程模型可以用来开发比Java线程效率更高的并发程序。我们学习scala Actor的目的主要是为后续学习Akka做准备。
Java并发编程的问题 在Java并发编程中,每个对象都有一个逻辑监视器(monitor),可以用来控制对象的多线程访问。我们添加sychronized关键字来标记,需要进行同步加锁访问。这样,通过加锁的机制来确保同一时间只有一个线程访问共享数据。但这种方式存在资源争夺、以及死锁问题,程序越大问题越麻烦。
思索问题
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.nicai.Demo;public class MyLock { public static Object obja = new Object(); public static Object objb = new Object(); } class DieLock extends Thread { private boolean flag; public DieLock (boolean flag) { this .flag=flag; } @Override public void run () { if (flag){ synchronized (MyLock.obja){ System.out.println("a" ); synchronized (MyLock.objb){ System.out.println("b" ); } } }else { synchronized (MyLock.objb){ System.out.println("bb" ); synchronized (MyLock.obja){ System.out.println("aa" ); } } } } public static void main (String[] args) { DieLock lock1 = new DieLock(true ); DieLock lock2 = new DieLock(false ); lock1.start(); lock2.start(); } }
Artor并发编程模型 Actor并发编程模型,是scala提供给程序员的一种与Java并发编程完全不一样的并发编程模型,是一种基于事件模型的并发机制。Actor并发编程模型是一种不共享数据,依赖消息传递的一种并发编程模式,有效避免资源争夺、死锁等情况。
java 并发编程 与Actor并发编程对比
Java内置线程模型
scala Actor模型
“共享数据-锁”模型 (share data and lock)
share nothing
每个object有一个monitor,监视线程对共享数据的访问
不共享数据,Actor之间通过Message通讯
加锁代码使用synchronized标识
死锁问题
每个线程内部是顺序执行的
每个Actor内部是顺序执行的
注意
scala在2.11.x版本中加入了Akka并发编程框架,老版本已经废弃。Actor的编程模型和Akka很像,我们这里学习Actor的目的是为学习Akka做准备。
创建Actor 创建Actor的方式和Java中创建线程很类似,也是通过继承来创建。
使用方式
定义class或object继承Actor特质
重写act方法
调用Actor的start方法执行Actor
类似于Java线程,这里的每个Actor是并行执行的
示例说明
创建两个Actor,一个Actor打印1-10,另一个Actor打印11-20
使用class继承Actor创建(如果需要在程序中创建多个相同的Actor)
使用object继承Actor创建(如果在程序中只创建一个Actor)
参考代码
使用class继承Actor创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 object _05ActorDemo { class Actor1 extends Actor { override def act (): Unit = (1 to 10 ).foreach(println(_)) } class Actor2 extends Actor { override def act (): Unit = (11 to 20 ).foreach(println(_)) } def main (args: Array [String ]): Unit = { new Actor1 ().start() new Actor2 ().start() } }
使用object继承Actor创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.nicai.demo.actorDemoimport scala.actors.Actor object Demo26 { object A1 extends Actor { override def act (): Unit = (1 to 10 ).foreach(println(_)+"," ) } object A2 extends Actor { override def act (): Unit = (11 to 20 ).foreach(print(_)+"," ) } def main (args: Array [String ]): Unit = { A1 .start() A2 .start() } }
Actor程序运行流程
调用start()方法启动Actor
自动执行act ()方法
向Actor发送消息
act方法执行完成后,程序会调用**exit()**方法
发送消息 与接收消息 我们之前介绍Actor的时候,说过Actor是基于事件(消息)的并发编程模型,那么Actor是如何发送消息和接收消息的呢?
使用方式 发送消息
我们可以使用三种方式来发送消息:
!
发送异步消息,没有返回值
!?
发送同步消息,等待返回值
!!
发送异步消息,返回值是Future[Any]
例如:
要给actor1发送一个异步字符串消息,使用以下代码:
接收消息
Actor中使用receive方法来接收消息,需要给receive方法传入一个偏函数
1 2 3 4 5 { case 变量名1 :消息类型1 => 业务处理1 , case 变量名2 :消息类型2 => 业务处理2 , ... }
注意:
receive方法只接收一次消息,接收完后继续执行act方法
示例说明
创建两个Actor(ActorSender、ActorReceiver)
ActorSender发送一个异步字符串消息给ActorReceiver
ActorReceive接收到该消息后,打印出来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.nicai.demo.actorDemoimport java.util.concurrent.TimeUnit import scala.actors.Actor object Demo27 { object MsgSender extends Actor { override def act (): Unit = { MsgReceiver ! "nicai" TimeUnit .SECONDS .sleep(3 ) } } object MsgReceiver extends Actor { override def act (): Unit = { receive{ case msg: String => println(msg) } } } def main (args: Array [String ]): Unit = { MsgSender .start() MsgReceiver .start() } }
持续接收消息 通过上一个案例,ActorReceiver调用receive来接收消息,但接收一次后,Actor就退出了。
我们希望ActorReceiver能够一直接收消息,怎么实现呢?
——我们只需要使用一个while(true)循环,不停地调用receive来接收消息就可以啦
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.nicai.demo.actorDemoimport java.util.concurrent.TimeUnit import scala.actors.Actor object Demo27 { object MsgSender extends Actor { override def act (): Unit = { MsgReceiver ! "nicai" TimeUnit .SECONDS .sleep(3 ) } } object MsgReceiver extends Actor { override def act (): Unit = { receive{ case msg: String => println(msg) } } } def main (args: Array [String ]): Unit = { MsgSender .start() MsgReceiver .start() } }
使用loop和react 优化接收消息 上述代码,使用while循环来不断接收消息。
如果当前Actor没有接收到消息,线程就会处于阻塞状态
如果有很多的Actor,就有可能会导致很多线程都是处于阻塞状态
每次有新的消息来时,重新创建线程来处理
频繁的线程创建、销毁和切换,会影响运行效率
在scala中,可以使用loop + react来复用线程。比while + receive更高效
使用loop + react重写上述案例
参考代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 loop { react { case msg:String => println("接收到消息:" + msg) } } 改写: package com.nicai.demo.actorDemoimport java.util.concurrent.TimeUnit import scala.actors.Actor object Demo29 { object MsgSender extends Actor { override def act (): Unit = { while (true ){ MsgReceice ! "NICAII" TimeUnit .SECONDS .sleep(3 ) } } } object MsgReceice extends Actor { override def act (): Unit = { loop{ react{ case msg :String => println(msg) } } } } def main (args: Array [String ]): Unit = { MsgReceice .start() MsgSender .start() } }
发送和接收自定义消息 我们前面发送的消息是字符串类型,Actor中也支持发送自定义消息,常见的如:使用样例类封装消息,然后进行发送处理。
例子1
示例说明
创建一个MsgActor,并向它发送一个同步消息,该消息包含两个字段(id、message)
MsgActor回复一个消息,该消息包含两个字段(message、name)
打印回复消息
注意:
使用!?
来发送同步消息
在Actor的act方法中,可以使用sender获取发送者的Actor引用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import scala.actors.Actor object Demo30 { case class Msg (name:String ,Age :Int ) //封装回复消息 case class ReplyMsg (name:String ,addres:String ) //接收消息 object MsgActor extends Actor { override def act (): Unit = { loop{ react{ case Msg (name,age) =>{ println("收到消息" +s"${name} :${age} " ) sender ! ReplyMsg ("wobucai" ,"bbb" ) } } } } } def main (args: Array [String ]): Unit = { MsgActor .start() val unit:Any = MsgActor !? Msg ("nicai" ,22 ) if (unit.isInstanceOf[ReplyMsg ]){ println("回复消息" +unit.asInstanceOf[ReplyMsg ]) } } }
实例2
创建一个MsgActor,并向它发送一个异步无返回消息,该消息包含两个字段(message, company)
使用!
发送异步无返回消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import com.nicai.demo.actorDemo.Demo30 .Msg import scala.actors.Actor object Demo31 { case class Mag (name:String ,age:Int ) object MsgActor extends Actor { override def act (): Unit = { loop{ react{ case Msg (name,age) => { println(s"${name} :${age} " ) } } } } } def main (args: Array [String ]): Unit = { MsgActor .start() MsgActor ! Msg ("你猜" ,55 ) } }
例子3
创建一个MsgActor,并向它发送一个异步有返回消息,该消息包含两个字段(id、message)
MsgActor回复一个消息,该消息包含两个字段(message、name)
打印回复消息
注意:
使用!!
发送异步有返回消息
发送后,返回类型为Future[Any]的对象
Future表示异步返回数据的封装,虽获取到Future的返回值,但不一定有值,可能在将来某一时刻才会返回消息
Future的isSet()可检查是否已经收到返回消息,apply()方法可获取返回数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.nicai.demo.actorDemoimport scala.actors.Actor object Demo32 { case class Msg (name:String ,age: Int ) //封装返回消息 case class ReMsg (name:String ,age: Int ) //设置接收消息 object MsgActor extends Actor { override def act (): Unit = { loop{ react{ case Msg (name,age) =>{ println(s"${name} :${age} " ) sender ! ReMsg ("NICAI" ,4564 ) } } } } } def main (args: Array [String ]): Unit = { MsgActor .start() val unit = MsgActor !! Msg ("温暖你的空间" ,777 ) while (!unit.isSet){ } println(unit.apply().asInstanceOf[ReMsg ]) } }
##WordCount案例
我们要使用Actor并发编程模型实现多文件的单词统计
需求:
给定几个文本文件(文本文件都是以空格分隔的),使用Actor并发编程来统计单词的数量
实现思路
MainActor获取要进行单词统计的文件
根据文件数量创建对应的WordCountActor
将文件名封装为消息发送给WordCountActor
WordCountActor接收消息,并统计单个文件的单词计数
将单词计数结果发送给MainActor
MainActor等待所有的WordCountActor都已经成功返回消息,然后进行结果合并
步骤1 | 获取文件列表 实现思路
在main方法中读取指定目录(${project_root_dir}/data/)下的所有文件,并打印所有的文件名
实现步骤
创建用于测试的数据文件
加载工程根目录,获取到所有文件
将每一个文件名,添加目录路径
打印所有文件名
参考代码
1 2 3 4 5 6 7 8 val DIR ="day22Scala3/data/" val list = new File (DIR ).list().toList val pathAll = list.map(DIR + _) println(pathAll)
步骤2 | 创建WordCountActor 实现思路
根据文件数量创建WordCountActor,为了方便后续发送消息给Actor,将每个Actor与文件名关联在一起
实现步骤
创建WordCountActor
将文件列表转换为WordCountActor
为了后续方便发送消息给Actor,将Actor列表和文件列表拉链到一起
打印测试
参考代码
MainActor.scala
1 2 3 4 5 6 7 8 val wordCountList = list.map { fileNmae => new WordCountActor () } val tuplesList = wordCountList.zip(pathAll) println(tuplesList)
WordCountActor.scala
1 2 3 4 5 class WordCountActor extends Actor { override def act (): Unit = { } }
步骤3 | 启动Actor/发送/接收任务消息 实现思路
启动所有WordCountActor,并发送单词统计任务消息给每个WordCountActor
注意
此处应发送异步有返回消息
实现步骤
创建一个WordCountTask样例类消息,封装要进行单词计数的文件名
启动所有WordCountTask,并发送异步有返回消息
获取到所有的WordCount中获取到的消息(封装到一个Future列表中)
在WordCountActor中接收并打印消息
参考代码
MainActor.scala
1 2 3 4 5 6 7 8 9 10 tuplesList.map{ actorFileName =>{ val actor = actorFileName._1 actor.start() val future = actor !! Msg (actorFileName._2) future } }
MessagePackage.scala
1 2 3 4 5 6 case class Msg (name:String )
WordCountActor.scala
1 2 3 4 5 6 7 8 9 10 11 class WordCountActor extends Actor { override def act (): Unit = { loop{ react{ case Msg (fileName) => println("对" +fileName+"进行单词统计" ) } } } }
步骤4 | 消息统计文件单词计数 实现思路
读取文件文本,并统计出来单词的数量。例如:
1 (hadoop, 3), (spark, 1)...
实现步骤
读取文件内容,并转换为列表
按照空格切割文本,并转换为一个一个的单词
为了方便进行计数,将单词转换为元组
按照单词进行分组,然后再进行聚合统计
打印聚合统计结果
参考代码
WordCountActor.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class WordCountActor extends Actor { override def act (): Unit = { loop{ react{ case Msg (fileName) => println("对" +fileName+"进行单词统计" ) val wordLineList = Source .fromFile(fileName).getLines().toList val wordList = wordLineList.flatMap(_.split(" " )) val wordAndCountList = wordList.map(_ -> 1 ) val wordGroubList = wordAndCountList.groupBy(_._1) var wordSum=wordGroubList.map{ keyValue => keyValue._1 -> keyValue._2.map(_._2).sum } println(wordSum) } } } }
步骤5 | 封装单词计数结果回复给MainActor 实现思路
将单词计数的结果封装为一个样例类消息,并发送给MainActor
MainActor等待所有WordCount均已返回后获取到每个WordCountActor单词计算后的结果
实现步骤
定义一个样例类封装单词计数结果
将单词计数结果发送给MainActor
MainActor中检测所有WordActor是否均已返回,如果均已返回,则获取并转换结果
打印结果
参考代码
MessagePackage.scala
1 2 3 4 5 6 7 case class WordCountResult (wordSum:Map [String ,Int ] )
WordCountActor.scala
1 2 3 sender ! WordCountResult (wordSum)
MainActor.scala
1 2 3 4 5 6 7 8 while (futureList.filter(!_.isSet).size!=0 ){} val wordCountResultList = futureList.map(_.apply().asInstanceOf[WordCountResult ]) val stringToInts = wordCountResultList.map(_.wordSum) println(stringToInts)
步骤6 | 结果合并 实现思路
对接收到的所有单词计数进行合并。因为该部分已经在WordCountActor已经编写过,所以抽取这部分一样的代码到一个工具类中,再调用合并得到最终结果
实现步骤
创建一个用于单词合并的工具类
抽取重复代码为一个方法
在MainActor调用该合并方法,计算得到最终结果,并打印
参考代码
WordCountUtil.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def reduce (wordCountList:List [(String , Int )]) = { val grouped: Map [String , List [(String , Int )]] = wordCountList.groupBy(_._1) val wordCount: Map [String , Int ] = grouped.map { tuple => val word = tuple._1 val total = tuple._2.map(_._2).sum word -> total } wordCount }
MainActor.scala
1 2 3 4 val result: Map [String , Int ] = WordCountUtil .reduce(resultList.flatten)println("最终结果:" + result)