fkm blog

software開発に関することを書いていきます

Google I/O 2019: Understand Kotlin Coroutines on Android

Kotlin系いきます。

youtu.be

理想的なコード

こんなコード、書けるといいよね。

val user = fetchUser() // ネットワークからユーザー情報をとってくる
textView.text = user.name

でも、当然ながらUIスレッドでこんなコード書くと、 fetchUser()NetworkOnMainThreadException 投げちゃうので、だめ。

じゃあこうするとどう?

thread {
    val user = fetchUser() // ネットワークからユーザー情報をとってくる
    textView.text = user.name
}

今度は textView への変更で CalledFromWrongThreadException 投げちゃう。

これならどうだ。

fetchUser { user -> 
    textView.text = user.name
}

これならOK。コールバックのリークがなければ。

Subscriptionを使う例

コールバック方式の場合、途中でとめる仕組みがないとアクティビティが停止しても処理が進んじゃう。そのためこんなコードを書くよね。

val subscription = fetchUser { user ->
    textView.text = user.name
}

override fun onStop() {
    super.onStop()
    subscription.cancel()
}

非同期処理が1つならいいけど、これが複数になると、 onStop() がこんな感じになっちゃう。

override fun onStop() {
    super.onStop()
    subscription.cancel()
    subscription2.cancel()
    subscription3.cancel()
    subscription4.cancel()
    subscription5.cancel()
}

これをなんとかしたいので、 Architecture Componentが誕生した。

RxJava way

RxJavaを使う方法もある。

fun fetchUser(): Observable<User> = ...

fetchUser()
    .as(autoDisposable(AndroidLifecycleScopeProvider.from(this))
    .subscribe { user -> 
        textView.text = user.name
    }

LiveData way

LiveDataだと、こんな感じで書ける。

fun fetchUser(): LiveData = ...

fetchUser().observe(viewLifecycleOwner) { user -> textView.text = user.name }

解決済みの問題?

現状、並行性に関する問題の解法は3つ。

  • LiveData: Observable Data holder
  • RxJava: Observable + Scheduler + Observer
  • Coroutines: 中断可能な計算

LiveDataは「いいけど、より完璧なソリューション頼む」、RxJavaは「パワフルだけど、時に間違った使われ方をしたり、オーバーキルすぎたりする」

コルーチンは「ベストっぽいが、エクステンションの充実と学習曲線がなぁ」

Jetpackでは、コルーチンをメインとして、RxJavaはドキュメントでのサポートとするらしい。

コルーチン

コルーチンとは、コールバックスタイルを置き換える、非同期処理をシンプルにしてくれるもの。

例えば、こんなコードがあったとする。

fun loadUser() {
    val user = api.fetchUser()
    show(user)
}

もちろんこのコードは (apiの実装にもよるが)UIスレッドをブロックしてしまったり、クラッシュしたりする。

コールバックスタイルだと、こんな感じ。

fun loadUser() {
    api.fetchUser() { user -> 
        show(user)
    }
}

コルーチンだと、こんな感じになる。

suspend fun loadUser() {
    val user = api.fetchUser()
    show(user)
}

ブロッキングスタイルだけど、UIをブロックしたりしない。

fetchUser() は、こんなメソッドとして定義される。

suspend fun fetchUser() {
    withContext(Dispatchers.IO) {

    }
}

Kotlinでは、Dispatcherとして - Default: 計算用 - IO: ディスクやネットワークアクセス用 - Main: Androidのメイン(UI)スレッド が用意されている。

withContext() はUIスレッドで呼んでも大丈夫。

Kotlinではどうやってる?

※ここは動画みたほうがわかりやすいかも。

suspend な関数が呼ばれると、スタックにマーカーを置く。マーカーを置いてから、通常の関数呼び出しのように呼び出し位置をスタックに積む。そして関数内部で別の suspend な関数が呼ばれると、マーカーまでを避難させて処理を続ける。呼んだ位置がちゃんと退避されているので、Dispatcherでの処理が終わった時点でちゃんとスタックの復元ができるという仕組み。

ライブラリの対応

WorkManager

work-runtine-ktx:2.0.0 ですでにコルーチンサポート済み。こんな感じで使う

class UploadNotesWorker(...): CoroutineWorker(...) {
    override suspend fun doWork(): Result {
        val newNotes = db.queryNewNotes()
        noteService.uploadNotes(newNotes)
        db.markAsSynced(newNotes)
        return Result.success()
    }
} 

LiveData

LiveData:2.2.0-alpha01 で、 liveData が追加された。中身はコルーチンブロックなので suspend 関数を呼ぶことができる。

// user: LiveData<User>
// なおこのままだとメインスレッドでDBを読むので例外になっちゃう
val user = liveData {
    emit(db.load(userId))
}

Dispatcherを指定することもできる。

// user: LiveData<User>
val user = liveData(Dispatchers.ID) {
    emit(db.load(userId))
}

タイムアウトも指定できる。なぜタイムアウトの指定があるのか?それは画面回転にともなうLiveDataのunsubscribeとsubscribeによるもの。画面回転すると、今の画面は使えないのでLiveDataに対してはunsubscribeが行われる。LiveData側からすると、subscribeしてるものがなくなるので処理を中止してよいことになってしまう。そして再度subscribe(通常だと1秒未満)が行われれ、処理が最初から行なわれる という事態が発生してしまう。これではイケてないのでタイムアウトの概念が導入されている。

また、 emit() は何度も呼んでよい。なのでこんな感じのコードが書ける。

val user = liveData {
    emit(db.load(userId)) // まずDBにあるのを返して
    val u = api.fetch(userId) // ネットワークからとってきて
    db.insert(u) // それをDBにいれて
    emit(u) // 最新のものとして返す
}

もし db.load(userId) がRoom経由のLiveDataを返す場合、DBの監視機能がついてるので最後の emit() がいらなくなる(insertした時点で db.load() が返したLiveDataに最新のデータがやってくる)。そんなケースの場合は emitSource() を使う。

val user = liveData {
    emitSource(db.load(userId)) // まずDBにあるのをLiveDataで返して
    val u = api.fetch(userId) // ネットワークからとってきて
    db.insert(u) // それをDBにいれる
}

ViewModel

コルーチンは油断するとリークしてしまう(必要ないのに処理を続けてしまう)ので、Kotlinでは「スコープ」が導入されている。すべてのコルーチンはスコープの中で実行されないといけない。スコープは中で実行しているコルーチンを必要なときに全部キャンセルできる。

ViewModelもコルーチンスコープになった。ViewModeの中で次のようなコードが書ける。

viewModelScope.launch {
    // UIスレッドで実行される
    while(true) {
        delay(1_000)
        writeFile()
    }
}

画面遷移などでViewModelが破棄されるときにコルーチンもキャンセルされる。

Lifecycle

ライフサイクルオーナーもコルーチンスコープとなるよう修正が入った。

LifecycleOwner.lifecycleScope: CoroutineScope

なので、 ActivityFragment の中で次のようなコードが書けるようになった。

fun loadUser() {
    this.lifecycleScope.launch {  
        ...
    }
}

UIに関連した非同期処理がとても書きやすくなる。例えば画面が表示されてから、1秒待ってヒントを出すといった際、こんな感じで書ける。

override fun onStart() {
    super.onStart()
    this.lifecycleScope.launch {
        delay(1_000) // 1秒待って
        showFullHint()
        delay(1_000) // さらに1秒待って
        showSmallHint()
    }
}

画面回転とかでキャンセルされる!というのを忘れてはいけない。画面回転しても継続したほうがいいような処理はViewModelで。

また、lifecycleScope.launch はフラグメントの状態がどうなってるかわからないので、非同期処理後に画面遷移といったコードを書くと、タイミングによっては IllegalStateException が発生しちゃう。そのため、 launchWhenStarted というのが追加されている。

override fun onCreate() {
    super.onCreate()
    this.lifecycleScope.launchWhenStarted {
        val note = userViewModel.loadNote()
        fragmentManager.beginTransaction()...commit()
    }
}

この場合、途中で suspend な関数呼び出しから戻ってきたタイミングで状態が STARTEDになっているかのチェックが入る。PAUSEDとかになっていたら続きの処理は行なわれない。

テストについて

kotlinx-coroutines-test というライブラリがある。まだexperimental。特定のテストツールに依存してるわけじゃないので、JUnit 4とか5とか、お好きなのをどうぞ。

例えばこんなクラスを考えてみよう。

class Repository {
    val liveData = liveData {
        emit(1)
        delay(1_000)
        emit(2)
    }
}

テストコードはどうするか。まずはスコープを作る。

val testDispatcher = TestCoroutineDispatcher()
val testScope = TestCoroutineScope(testDispatcher)

で、beforeとafterに設定を追加。

@Before
fun setup() {
    Dispatchers.setMain(testScope)
}

@After
fun tearDown() {
    Dispatchers.resetMain()
    testScope.cleanupTestCoroutines()
}

テストは、こんな感じで書く。JUnit4のRuleを使ってる。

@get:Rule
val testCoroutineRule = TestCoroutineRule()

@Test
fun test01 = testCoroutineRule.runBlockingTest {
    val subject = repository.liveData
    subject.observeForTesting {
        subject.value shouldEqual 1
        advanceTimeBy(1_000)
        subject.value shouldEqual 2
    }
}

observeForTesting はライブラリで提供されている関数ではなく、こんな感じで自作したもの。

fun <T> LiveData<T>.observeForTesting(
    block: () -> Unit
) {
    val observer = Observer<T> { Unit}
    try {
        observerForever(observer)
        block()        
    } finally {
        removeObserver(observer)
    }
}