目录

Background

简单写下scala中的Future以及对Thread的认识

java和scala中都有Future,那么这两个Future有什么不同呢?Thread是怎么样的,它的状态是如何变化的呢?一些操作比如sleep会涉及到锁么?

Java Future

java中Future类中方法很简单,也很少.

    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

比较常用的就是get,可以设置超时时间。下面是使用java Future的一个场景,通常是使用线程池submit Callable。记得线程池要放在finally模块关闭。

  def testJavaFuture(): Unit = {
    val call = new Callable[Long] {
      override def call(): Long = {
        Thread.sleep(10000)
        123L
      }
    }
    val pool = Executors.newFixedThreadPool(1)
    try {
      val f = pool.submit(call)
      println(f.isDone)
      println(f.get(6, TimeUnit.SECONDS))
    } finally {
      pool.shutdown()
    }
  }

Scala Future

相较于java的Future,scala的Future中方法很丰富。而且scala中伴生对象的apply方法使得创建一个Future非常方便.例如:

   val f: Future[String] = Future {
     " future!"
   }

介绍其中几个方法,用法写在注释中,println结果也在注释中。

import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Success
import scala.util.control.NonFatal

object TestScalaFuture {
  implicit val executionContext = ExecutionContext.global
  def createIntFuture(value: Int): Future[Int] = {
    Future {
      value
    }
  }

  def createIntFutureWithFailure(value: Int): Future[Int] = {
    Future {
      1 / 0
      value
    }
  }

  /**
   * recover方法是在Future发生异常时的处理。
   */
  def testRecover(): Unit = {
    val f1 = createIntFutureWithFailure(1).recover {
      case e: Exception =>
        -1
    }
    println(Await.result(f1, Duration("10s")))  // -1
  }

  /**
   * 将两个Future zip到一起,这样就只需要使用一个Await就可以等结果。
   */
  def testZip(): Unit = {
    val f1 = createIntFuture(1)
    val f2 = createIntFuture(2)
    println(Await.result(f1.zip(f2), Duration("10s"))) // (1,2)
  }

  /**
   * 功能类似于zip,是处理更多个,需要指定CanBuildFrom。
   */
  def testSequence(): Unit = {
    val f1 = createIntFuture(1)
    val f2 = createIntFuture(2)
    val f3 = createIntFuture(3)
    implicit val cbf = implicitly[CanBuildFrom[Iterator[Future[Int]], Int, Iterator[Int]]]
    val r = Await.result(Future.sequence(Iterator(f1, f2, f3)), Duration("10s"))
    r.foreach(v => print(v + "\t")) // 1 2 3
  }

  /**
   * 这里的map, flatMap等操作是对返回值进行的操作,也是lazy的。
   * 这里的andThen不会改变返回值。
   * Transform是对返回值进行的操作,以及对异常的转换。
   */
  def testMisc(): Unit = {
    val f1 = createIntFuture(1).map(_ * 7).map(_ + 1)
    println(Await.result(f1, Duration("10s") )) // 8
    val f2 = createIntFuture(2).andThen {
      case Success(v) if v ==2 =>
        println("the value is 2") // 这里只是执行一些操作,但是不会改变Future的返回值
      case _ =>
    }
    println(Await.result(f2, Duration("10s"))) // 2
    val f3 = createIntFuture(3).transform(v => "str:" + v, throwable => throwable match {
      case NonFatal(throwable) => new Exception(throwable)
      case _ => throwable
    })
    println(Await.result(f3, Duration("10s"))) // str:3
  }
}

Thread

Thread类实现了Runnable,是一个特殊的Runnable类。

一个线程代表一个程序的执行。jvm允许一个应用并发执行多个线程。每个线程都有一个优先级,优先级高的线程相对于优先级低的线程,更容易被执行。每个线程都可能被标记为一个守护(daemon)线程。当一个线程创建了一个新的线程,这个新的线程的优先级初始化为和创建它的线程一样。

当一个JVM 启动时,通常是只有一个非守护线程。JVM会一直运行直到:

  • exit方法被调用,并且允许exit。
  • 所有非守护线程都已经结束,可以是正常返回结束也可以是异常结束。

有两种方法生成一个新的执行线程。

一种是继承Thread类,overwrite run方法,然后start。

另一种是继承Runnable类,实现run方法,然后 new Thread(runnable).start.

线程的优先级分为1,5,10。1是所允许的最低优先级,5是默认分配,10是能够拥有的最高优先级。

Thread类里面提供了一些静态工具方法. Deprecated的方法不再列出.

Thread状态

首先,thread的五种状态.

  • NEW 线程被创建,还没start
  • RUNNABLE 在JVM上运行,可能在等操作系统的资源,比如时间片
  • BLOCKED 阻塞状态,等待lock来进入同步代码块
  • WAITING
    • Object.wait 没有指定timeout
    • 因为Thread.join 无timeout等待
    • LockSupport.park()无限期等待
  • TIMED_WAITING 有timeout的WAITING
    • Object.wait(long)
    • Thread.join(long)
    • LockSupport.parkNanos LockSupport.parkUntil
  • TREMINATED 线程退出

Thread 方法解析

yield

yield方法是给调度器一个hint表明自己自愿放弃当前的处理器,调度器可以忽略这个hint。这个方法不推荐,很少使用,可以用于避免cpu过度利用,但是使用之前要做好详细的分析与benchmark。spark项目中没有用到过yield.

sleep

sleep方法比较常用,这是将当前线程放弃执行,休眠一段时间,但是sleep不会放弃自己得到的monitor.

sleep(0)的意思代表是,大家所有线程重新抢占一下处理器。

threadGroup

在创建thread时候可以传入threadGroup参数。如果没有传入group,如果该线程指定了securityManager,则去问securityManager拿group,最终是拿currentThread的group,如果没指定securityManager,则和父线程一组。

start

开始运行线程,jvm调用run()方法。一个线程只能启动一次,否则会报IllegalThreadStateException。

run

实现的Runnable的run方法,用于让jvm调用

interrupt

如果是线程自己interrupt自己,是允许的,否则,需要securityManager进行checkAccess,可能会抛出SecurityException。

interrupt之后会加一个标志位interrupted.

如果此时该线程被 wait, join, sleep, 那么这个interrupted标志位会被清除然后抛出InterruptedException.

如果线程被java.nio.channels.InterruptibleChannel的I/O操作阻塞,那么这个channel将被关闭,然后set interrupted标志位,这个线程会收到一个ClosedByInterruptException.

如果线程被java.nio.channels.Selector阻塞,那么将会设置interrupted标志位,并马上从selection操作返回。

如果上述情况都没发生,那么这个线程设置interrup状态标志位.

如果线程已经dead,interrupt操作没丝毫作用,也不会出错。

isInterrupted

查看是否被设为interrupted.

setPriority

改变线程的优先级。首先会由securityManager进行校验,校验失败抛SecurityException. 校验成功,则取设置的值和当前threadGroup的最大权限中的较小值,作为线程的优先级。

getPriority

获得线程优先级.

setName, getName

设置线程名字,获取线程名字

getThreadGroup

获得线程的threadGroup

activeCount

获得当前线程的threadGroup以及subGroup中的线程数.由于线程在动态变化,因此只是一个估计值,主要是用于debug以及monitoring.

join(time)

等待线程结束,如果join(0)代表一直等待。如果该线程被其他thread interrupt,那么这个线程的interrupted标志位被清除,然后抛出InterruptedException.

dumpStack

打印当前线程的栈,只用于debug.

setDaemon(isDaemon)

设为守护线程或者用户线程。JVM会在所有用户线程都挂掉之后退出。

必须在线程启动之前设置,如果线程已经是alive,会抛IllegalThreadStateException.同样也会检查SecurityManager当前线程是否有权限去设置。

isDaemon

是否是守护线程

checkAccess

检查当前线程有没有权限去修改这个线程。

getContextClassLoader, setContextClassLoader

classLoader是用于加载classes和resources。默认的classLoader是父线程的classLoader。原始的线程classLoader通常是设置为应用的classLoader。如果classLoader不为空, 且securityManager不为空,将会进行权限校验。权限校验几乎伴随thread的每个操作,后面就不再提了.

holdsLock(Object obj)

线程是否持有某个monitor.

getStackTrace, getAllStackTraces

一个是打印当前线程的stack,一个是所有线程的stack,用户debug

getId

获得线程Id

getState

获得线程状态

UncaughtExceptionHandler

是一个接口,用于当线程由于一些未捕获的异常而导致终止时的处理。

里面只有一个方法.

 void uncaughtException(Thread t, Throwable e);

get(set)DefaultUncaughtExceptionHandler, get(set)UncaughtExceptionHandler

关于设置UncaughtExceptionHandler。ThreadGroup是UncaughtExceptionHandler的一个实现类,如果当前thread没有设置UncaughtExceptionHandler,那么返回threadGroup。