Fork channel

Create a new channel as a copy of main.

Rename channel

Rename main to:

Delete channel

Delete main? This cannot be undone.

ActorDataCache.kt
package com.github.jonathanxd.dracon.cache

import com.github.jonathanxd.dracon.coroutine.DRACON_SCHEDULER
import com.intellij.execution.process.mediator.util.blockingGet
import com.intellij.openapi.project.Project
import com.intellij.openapi.project.getProjectDataPath
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.trySendBlocking
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.locks.ReentrantLock
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import kotlin.concurrent.withLock

/**
 * Denotes the current version of data cache, must be updated when data cache format changes.
 */
const val ACTOR_DATA_CACHE_VERSION: Long = 3

/**
 * Actor based data cache storage, with support to cache misses and falling back to default resolver,
 * this allows writes to on-file cached to lock without freezing IntelliJ IDEA (when request for data happens in UI-thread).
 */
class ActorDataCache<K, V>(val project: Project,
                     val name: String) {

    private val cachePath = project.getProjectDataPath("com.github.jonathanxd.dracon")
    private val cacheFile = cachePath.resolve("$name.gz")
    private val manager = ActorDataCacheManager<K, CachedValue<V>>(this.cacheFile, DATA_CACHE_VERSION)
    private val lock = ReentrantLock()
    private val updateExecutor = Executors.newSingleThreadExecutor()
    private val inMemory = ConcurrentHashMap<K, CachedValue<V>>()

    init {
        this.manager.load {
            inMemory.putAll(it)
        }
    }

    /**
     * Executes
     */
    fun <R> withLock(f: () -> R) = this.lock.withLock(f)

    fun unload(key: K) {
        this.lock.withLock {
            this.inMemory.remove(key)
        }
        writeToDisk()
    }

    fun queryOrLoad(key: K, compute: (K) -> V): V {
        return this.lock.withLock {
            if (!this.inMemory.containsKey(key)) {
                val computeEntry = CachedValue(Instant.now(), compute(key))

                this.inMemory[key] = computeEntry
                writeToDisk()
                computeEntry
            } else {
                this.inMemory[key]!!
            }.value
        }
    }

    fun <I> queryOrLoad(key: K,
                        compute: (K) -> I,
                        filter: (I) -> Boolean,
                        iMapper: (I) -> V,
                        vMapper: (V) -> I): I {
        return this.lock.withLock {
            if (!this.inMemory.containsKey(key)) {
                val computeEntry = compute(key)

                if (!filter(computeEntry)) {
                    return computeEntry
                }

                this.inMemory[key] = CachedValue(Instant.now(), iMapper(computeEntry!!))
                writeToDisk()
                computeEntry
            } else {
                vMapper(this.inMemory[key]!!.value)
            }
        }
    }

    /**
     * This call completes asynchronously.
     */
    private fun writeToDisk() {
        this.manager.write(this.inMemory)
    }

    /**
     * @see CacheService.invalidate
     */
    fun invalidate() {
        this.lock.withLock {
            this.inMemory.clear()
            this.writeToDisk()
        }
    }

    /**
     * @see CacheService.invalidate
     */
    fun invalidate(key: K) {
        this.lock.withLock {
            this.inMemory.remove(key)
            this.writeToDisk()
        }
    }

    /**
     * @see CacheService.invalidateAll
     */
    fun invalidateAll(keys: Iterable<K>) {
        this.lock.withLock {
            for (key in keys) {
                this.inMemory.remove(key)
            }
            this.writeToDisk()
        }
    }

    /**
     * @see CacheService.updateCache
     */
    fun updateCache(key: K, newValueCompute: () -> V) {
        val instant = Instant.now()
        this.lock.withLock {
            val inMemoryValue = this.inMemory[key]
            if (inMemoryValue == null || inMemoryValue.instant.isBefore(instant)) {
                this.inMemory[key] = CachedValue(instant, newValueCompute())
            }
        }
    }

    /**
     * @see CacheService.updateCache
     */
    fun updateCache(keys: List<K>, newValueCompute: (K) -> V) {
        val instant = Instant.now()
        this.lock.withLock {
            for (key in keys) {
                val inMemoryValue = this.inMemory[key]
                if (inMemoryValue == null || inMemoryValue.instant.isBefore(instant)) {
                    this.inMemory[key] = CachedValue(instant, newValueCompute(key))
                }
            }
        }
    }
}

sealed class CacheCommand<K, V> {
    class CacheLoad<K, V>(val completion: CompletableDeferred<Map<K, V>?>): CacheCommand<K, V>()
    class CacheSave<K, V>(val data: Map<K, V>, val completion: CompletableDeferred<Unit>): CacheCommand<K, V>()
}

// Do async, accept MISS while locked
// cache storage is very slow due to compression
class ActorDataCacheManager<K, V>(val path: Path, val version: Long) {

    @Suppress("BlockingMethodInNonBlockingContext")
    private val loader = CoroutineScope(Dispatchers.IO).actor<CacheCommand<K, V>> {
        for (msg in channel) {
            when (msg) {
                is CacheCommand.CacheLoad<K, V> -> {
                    msg.completion.complete(if (!Files.exists(path)) {
                        null
                    } else {
                        try {
                            Files.newInputStream(path).use { stream ->
                                GZIPInputStream(stream).use { gz ->
                                    ObjectInputStream(gz).use { reader ->
                                        val version = reader.readLong()
                                        if (version != this@ActorDataCacheManager.version) {
                                            emptyMap()
                                        } else {
                                            reader.readObject() as Map<K, V>
                                        }
                                    }
                                }
                            }
                        } catch (t: Throwable) {
                            t.printStackTrace()
                            Files.delete(path)
                            null
                        }
                    })
                }
                is CacheCommand.CacheSave<K, V> -> {
                    Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING).use { writer ->
                        GZIPOutputStream(writer).use { compress ->
                            ObjectOutputStream(compress).use { oos ->
                                oos.writeLong(this@ActorDataCacheManager.version)
                                oos.writeObject(msg.data)
                                msg.completion.complete(Unit)
                            }
                        }
                    }
                }
            }
        }
    }

    init {
        Files.createDirectories(this.path.parent)
    }

    fun load(onLoad: suspend (Map<K, V>) -> Unit) {
        CoroutineScope(DRACON_SCHEDULER).launch {
            val complete = CompletableDeferred<Map<K, V>?>()
            loader.send(CacheCommand.CacheLoad(complete))
            val load = complete.await()
            if (load != null) {
                onLoad(load)
            }
        }
    }

    fun write(data: Map<K, V>, onComplete: suspend () -> Unit = {}) {
        CoroutineScope(DRACON_SCHEDULER).launch {
            val complete = CompletableDeferred<Unit>()
            loader.send(CacheCommand.CacheSave(data, complete))
            complete.await()
            onComplete()
        }
    }
}