たかぎとねこの忘備録

プログラミングに関する忘備録を自分用に残しときます。マサカリ怖い。

Coroutineを学んでみた

CoroutineはノンブロッキングプログラミングをKotlinで使えるようにした機能。非同期操作を安全に扱える機能などが含まれている。

導入

Coroutineを使うにはkotlinx.coroutinesというパッケージを使わないといけない。

build.gradle.ktsファイルを開いて、dependenciesのブロックにimplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")を追加する。

// build.gradle.kts

dependencies {
    testImplementation(kotlin("test"))
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
}

pluginsブロックにkotlin("jvm") version "1.6.21"が書かれていることを確認する。

// build.gradle.kts

plugins {
    kotlin("jvm") version "1.6.21"
    application
}

pluginsブロックで、Kotlinの最新バージョンが使われていることを確認する。

plugins {
    kotlin("jvm") version "1.6.21"
}

ノンブロッキング

Coroutineを利用するにはsuspendを使ってsuspend関数を定義しないといけない。

そして、coroutineScopeを使用してCoroutineの利用範囲を指定して、内部で、launch()にラムダを渡す。そのラムダの内部では別のsuspend関数を呼び出すことができる。

main関数にもsuspendを付与することを忘れない。

import kotlinx.coroutines.*

suspend fun main(args: Array<String>) {
    coroutineTest()
}

suspend fun coroutineTest() = coroutineScope {
    launch {
        coroutineTest2()
    }
    println("Hello World")
}

suspend fun coroutineTest2() {
    println("Nice to meet you")
}

コルーチンビルダーはcoroutineScopeも併せて3つある。

  • coroutineScope
    • ブロック内で起動したコルーチンが完了するのを待たない
  • runBlocking
    • ブロック内で起動したコルーチンが完了するまで処理を待つ。
  • withContext
    • 指定したコンテキストに実行中の処理を切り替える

runBlockingを使う際はsuspendを使用しないで定義する。

import kotlinx.coroutines.*

fun main(args: Array<String>) = runBlocking {
    println("Start Coroutine")
    launch {
        println("before 200")
        delay(200)
        println("after 200")
    }

    println("Finished Coroutine")
}

Coroutineの実行に使用されるスレッドをlaunchに引数を渡すことで指定できる。これによりlaunchのラムダにあるsuspend関数をノンブロッキングで実行することができる。

このときに渡す引数のことをディスパッチャーという。ディスパッチャーはDispatchersを通して参照できる。

  • Dispatchers.Default
    • 指定されない場合に使用されるディスパッチャー
  • Dispatchers.Main
    • OSのメインスレッド
  • Dispatchers.Unconfined
    • 実行時に最初に使用可能なスレッドを割り当てる
  • Dispatchers.IO
    • データベースへのリクエストやファイルへのアクセスなどに使われる
import kotlinx.coroutines.*

fun main(args: Array<String>) = runBlocking {
    println("Start Coroutine")
    launch(Dispatchers.IO) {
        println("before 200")
        delay(200)
        println("after 200")
    }

    println("Finished Coroutine")
}

タイムアウトを設定できる。タイムアウトで設定した時間を超えた場合はTimeoutCancellationExceptionがスローされる。

import kotlinx.coroutines.*

fun main(args: Array<String>) = runBlocking {
    println("Start Coroutine")
    launch {
        timeoutTest()
    }
    println("Finished Coroutine")
}

suspend fun timeoutTest() = coroutineScope{
    withTimeout(timeMillis = 1_000) {
        println("before 200")
        delay(2_000)
        println("after 200")
    }
}

Job

launchの戻り値であり、この戻り値を使えばCoroutineを制御できる。 例えば、キャンセルすることもできる。

import kotlinx.coroutines.*

fun main(args: Array<String>) = runBlocking {
    println("Start Coroutine")
    val job = launch(Dispatchers.IO) {
        println("before 200")
        delay(200)
        println("after 200")
    }
    job.cancel()
    println("Finished Coroutine")
}

async

Coroutineで並列処理を行う場合は、async()を使ってCoroutineを作成する。そして、async()を使って作成したCoroutineの結果を受け取る場合は戻り値に対してawait()を呼び出す。await()を使うことで、スレッドをブロッキングすることなく処理の完了を待つことができる。

import kotlinx.coroutines.*

fun main(args: Array<String>) = runBlocking {
    testAsync()
}

suspend fun returnNum(num: Int): Int {
    return num
}

suspend fun testAsync() = coroutineScope {
    val a = async { returnNum(5) }
    val b = async { returnNum(10) }
    println("a: ${a.await()}")
    println("b: ${b.await()}")
}

ChannelとFlow

スレッド間でデータの受け渡しを行う方法として主にChannelFlowがある。

Channelにおいて、データを送信する場合はsend()を利用する。受信する場合はreceive()を利用する。

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        sendNum(channel, 200)
    }

    launch {
        val result = channel.receive()
        println(result)
    }
}

suspend fun sendNum(ch: Channel<Int>, num: Int): Unit {
    delay(300)
    ch.send(num)
}

Flowでは、データを送信する場合にemit()を呼び出す。そして受信する際は、flow.collectにラムダを渡して、その中でitを経由して送信されたデータを参照する。

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.flow

fun main(args: Array<String>) = runBlocking<Unit> {
    val flow = flow {
        val result = returnNum(200)
        emit(result)
    }
    flow.collect {
        println(it)
    }
}

suspend fun returnNum(num: Int): Int {
    delay(300)
    return num
}