Поиск по этому блогу

пятница, 2 августа 2019 г.

Реализация очереди задач в Android

Бывают такие задачи в которых приходится распределять работу функционала, что бы каждая задача выполнялась после окончания предыдущей. В таких случаях обычно пихают внутри колбека об окончании процесса что бы стартовал новый процесс, это выглядит не очень красиво, сильно грубо и нагромождено. 

image

В таких случаях обычно стараются использовать такую штуку как очередь. Она помогает поставить процесс понятно и легкоизменяемо, каждый просесс выполняется в отдельном потоке, отдельном классе, и никак не задевает работу следующего или предыдущего процесса, все предельно понятно и логично.
В моем примере будет пример очереди в которой будет три этапа:
— CREATE — это будет какой-нибудь этап на пример создания видео файла или чего-то.
— WORK — далее мы образно говоря делаем какие-то действия над ним, режем файл, меняем кодек или еще что-то и отправляем на сервер.
— CLEAN — стираем все файлы которые были созданы с памяти девайса.

Это очень абстрактно говоря тот флоу который я буду сегодня тут описывать. В реальности у нас не будет никаких действий над файлами, я просто добавлю задержку между процессами что бы показать что там что-то происходит. 

Визуально выглядеть это все будет на экране как два текстовых поля которые показывают задержку между процессами и текущий процесс который выполняется. По нажатию на кноку у нас стартует очередь.

Давайте для начала решим какие статусы у нас будут, я их уже описал выше, теперь нам надо их перенести в енам.

Statuses.kt
object Statuses {
    const val CREATE = "create"
    const val WORK = "work"
    const val CLEAN = "clean"
}
Как я и говорил выше, три статуса, они у нас будут описывать какие процессы у нас будут в очереди.

Далее нам нужно создать модель с которой у нас будет происходить сбор и получение данных, и в которой у нас будет хранится статусы и стейты процессов. Обычно это нужно делать где нибудь в БД, для того что бы эта информация хранилась постоянно и к ней можно было обратиться в случае ошибки или сбоя в очереди. Но я для упрощения примера решил это все хранить только во время жизни приложения, по этому все будем хранить в модельке. 

MetaDataModel.kt
import dajver.com.taskqueueexample.queue.enums.Statuses

class MetaDataModel(var id: Int?) {

    var state: String? = Statuses.CREATE

    var statusReason: String? = null

    var stateBeforeFail: String? = null
}
Данный класс у нас хранит в себе всю самую важную информацию касательно процесса, но так же в нем может быть информация касательно какого-то на пример файла который мы режем, меняем или отправляем на сервер, или что-то подобное. У меня все минимально, только то что потребуется в дальнейшем. state — стоит по дефолту CREATE, возможно в другом случае по дефолту статуса быть не может, по этому его придется убрать и он будет ставится или через конструктор или через сеттер.

Потом мы создаем несколько базовых классов и интерфейсов, которые будут у нас иметь основной функционал для работы с очередью.

BaseWorkItem.kt
import dajver.com.taskqueueexample.models.MetaDataModel

open class BaseWorkItem(open var metadata: MetaDataModel) {

    override fun hashCode(): Int {
        return 0
    }

    override fun equals(other: Any?): Boolean {
        if (this === other) {
            return true
        }
        if (other == null || this::class != other::class) {
            return false
        }
        return metadata.id == (other as BaseWorkItem).metadata.id
    }
}
Этот work item у нас является базовой точкой в которой у нас идет переопределение метода equals, для того что-бы мы могли понять являются ли work item's которые мы хотим сравнить — одинаковые или нет. Это мы будем понимать по ID которые у нас будут уникальные и которые мы сравниваем в методе equals. Так же у нас каждый work item с которым мы будем дальше работать будет наследником BaseWorkItem, по этому любой из имеющихся work item's мы сможем сравнить и использовать в базовых классах.

Далее создаем несколько лисенеров которые мы будем использовать для кидания колбеков из базовых классов в наши процессы и наоборот из процессов в базовый класс.

BaseWorkListener.kt
import dajver.com.taskqueueexample.queue.base.model.BaseWorkItem

interface BaseWorkListener {
    fun onJobComplete(workItem: BaseWorkItem)

    fun onJobFailed(exception: Exception, workItem: BaseWorkItem)
}
Этот листенер мы будем использовать для сигнализации в класс менеджер о том что данный процесс или закончил свою работу или зафейлился и упал и в классе менеджере мы будем уже дальше решать что делать, или двигаться на следующий процесс или выводить сообщение об ошибке.

BaseProcessCallback.kt
import dajver.com.taskqueueexample.queue.base.model.BaseWorkItem

interface BaseProcessCallback<T> where  T : BaseWorkItem {
    fun onSuccess(workItem: T)

    fun onError(workItem: T, error: String?)
}
Так же у нас будет вот такой еще один лисенер, который будет кидать колбеки в базовый класс процессов, и в зависимости от прилетевшего колбека наш базовый класс будет уже проверять есть ли у нас в очереди еще какие-то задачи, и если есть то будет двигать их дальше, иначе если они остуствуют то просто выходить и останавливать работу.

BaseTaskProcessor.kt
import dajver.com.taskqueueexample.queue.base.interfaces.BaseProcessCallback
import dajver.com.taskqueueexample.queue.base.interfaces.BaseWorkListener
import dajver.com.taskqueueexample.queue.base.model.BaseWorkItem

abstract class BaseTaskProcessor<T> : BaseProcessCallback<T> where T : BaseWorkItem {

    private var queue = LinkedHashSet<T>()

    private var currentProcessingWorkItem: BaseWorkItem? = null
    protected var listener: BaseWorkListener? = null

    fun queueWorkItem(workItem: T) {
        if (currentProcessingWorkItem == null || currentProcessingWorkItem!! != workItem) {
            queue.add(workItem)
            checkWorkQueue()
        }
    }

    override fun onSuccess(workItem: T) {
        currentProcessingWorkItem = null
        checkWorkQueue()
    }

    override fun onError(workItem: T, error: String?) {
        currentProcessingWorkItem = null
        checkWorkQueue()
    }

    private fun checkWorkQueue() {
        if (currentProcessingWorkItem == null && queue.isNotEmpty()) {
            val currentItem = queue.first()
            queue.remove(currentItem)
            currentProcessingWorkItem = currentItem
            processWorkItem(currentItem, this)
        }
    }

    open fun cancelAll() {
        currentProcessingWorkItem = null
        queue.clear()
    }

    protected abstract fun processWorkItem(item: T, baseProcessCallback: BaseProcessCallback<T>)
}
Вот собственно наш базовый класс который будет следить за очередью, данный класс наследуются от BaseProcessCallback для получения колбеков из процессов которые подпишутся на изменения в данном классе. Так же у нас есть какая-то «Т» которая у нас обозначена как BaseWorkItem, а это значит что мы будем передавать WorkItem который будет наследником BaseWorkItem и в котором будут находится какие-то дополнительные поля которые понадобятся в ходе работы процессов.

— queueWorkItem — добавляет все процессы в очередь и проверяет если список не пуст и в нем находятся какие-то процессы.
— checkWorkQueue — берем первый айтем из списка и выполняем его с помощью метода processWorkItem который является абстрактным методом, он будет описываться в дочерних классах этого базового класса.
— onSuccess / onError — у нас проверяет есть ли что-то еще в очереди и запускает его если еще что-то есть.

С базовой частью мы закончили. Это по сути список всех классов которые у нас на данный момент будут использоваться в дочерних классах, кроме MetaDataModel. Далее мы будем создавать сами процессы которые будут дальше запускаться в очереди в менеджере процессов. 

Как я говорил раньше, у нас будет три процесса: CREATE, WORK, CLEAN. Для каждого из этих процессов нам нужно создать отдельный WorkItem и Process в которые мы будем передавать нужные данные и получать колбеки с результатами работы.

CreateWorkItem.kt
import dajver.com.taskqueueexample.models.MetaDataModel
import dajver.com.taskqueueexample.queue.base.model.BaseWorkItem

class CreateWorkItem(override var metadata: MetaDataModel) : BaseWorkItem(metadata)
В данном айтеме у нас в конструкторе передаем MetaDataModel в которой задаем уникальный ID процесса и статус. Так же мы унаследовали для данного класса BaseWorkItem и передали metadata который мы передали в конструкторе.

CreateProcess.kt
import android.os.Handler
import dajver.com.taskqueueexample.queue.base.BaseTaskProcessor
import dajver.com.taskqueueexample.queue.base.interfaces.BaseProcessCallback
import dajver.com.taskqueueexample.queue.base.interfaces.BaseWorkListener
import dajver.com.taskqueueexample.queue.flow.QueueFlowManager.Companion.SECONDS_IN_ONE_PROCESS

class CreateProcess(var workListener: BaseWorkListener) : BaseTaskProcessor<CreateWorkItem>() {

    init {
        listener = workListener
    }

    override fun processWorkItem(item: CreateWorkItem, baseProcessCallback: BaseProcessCallback<CreateWorkItem>) {
        Handler().postDelayed({
            // do something here and on success call - onSuccess and on fail - onError
            try {
                baseProcessCallback.onSuccess(item)
                workListener.onJobComplete(item)
            } catch (e: Exception) {
                baseProcessCallback.onError(item, e.message!!)
                workListener.onJobFailed(e, item)
            }
        }, SECONDS_IN_ONE_PROCESS!!)
    }
}
Что же мы тут видим? Мы подписались в конструкторе на лисенера который будет кидать коллбеки в BaseTaskProcessor, так же мы унаследовались от BaseTaskProcessor для определения метода processWorkItem и описали работу данного класса в этом методе. В данном случае у нас просто хендлер который 5 секунду будет ничего не делать, а дальше кинет колбеки в BaseTaskProcessor и в менеджера процессов, который мы опишем далее. Таких классов у нас еще будет два, я сюда их добавлять не буду, их можно будет найти по этим двум ссылкам: WORK — WorkWorkItem.ktWorkProcess.kt, CLEAN — CleanWorkItem.ktCleanProcess.kt

Далее нам нужно создать менеджера который будет управлять этими всеми процессами и запускать их по колбекам которые будут прилетать из этих же процессов. Для начала создадим интерфейс для визуализации работы очереди.

QueueFlowListener.kt
import dajver.com.taskqueueexample.models.MetaDataModel

interface QueueFlowListener {
    fun onWorkItemStateChange(metaDataModel: MetaDataModel)
}
Данный интерфейс нам нужен для отправки колбека о статусе текущего процесса в активити для отображения в текстовых полях, какой на данный момент процесс в очереди, и сколько до окончания работы онного.

QueueFlowManager.kt 
import android.util.Log
import dajver.com.taskqueueexample.models.MetaDataModel
import dajver.com.taskqueueexample.queue.base.interfaces.BaseWorkListener
import dajver.com.taskqueueexample.queue.base.model.BaseWorkItem
import dajver.com.taskqueueexample.queue.enums.Statuses.CLEAN
import dajver.com.taskqueueexample.queue.enums.Statuses.CREATE
import dajver.com.taskqueueexample.queue.enums.Statuses.WORK
import dajver.com.taskqueueexample.queue.flow.clean.CleanProcess
import dajver.com.taskqueueexample.queue.flow.clean.CleanWorkItem
import dajver.com.taskqueueexample.queue.flow.create.CreateWorkItem
import dajver.com.taskqueueexample.queue.flow.work.WorkProcess
import dajver.com.taskqueueexample.queue.flow.work.WorkWorkItem
import dajver.com.taskqueueexample.queue.flow.create.CreateProcess

class QueueFlowManager(var queueFlowListener: QueueFlowListener) : BaseWorkListener {

    private var createProcess: CreateProcess? = null
    private var workProcess: WorkProcess? = null
    private var cleanProcess: CleanProcess? = null
    private var metaDataModel: MetaDataModel? = null

    init {
        createProcess = CreateProcess(this)
        workProcess = WorkProcess(this)
        cleanProcess = CleanProcess(this)

        metaDataModel = MetaDataModel(1234)

        makeSequentialStatusMap()
    }

    private fun makeSequentialStatusMap() {
        val statusMap = ArrayList<String>()
        statusMap.add(CREATE)
        statusMap.add(WORK)
        statusMap.add(CLEAN)

        sequentialStatusMap = statusMap
    }

    fun startFlow() {
        onStateChanged(metaDataModel!!.state!!)
    }

    private fun onStateChanged(state: String) {
        metaDataModel!!.state = state

        when (state) {
            CREATE -> {
                val createWorkItem = CreateWorkItem(metaDataModel!!)
                createProcess!!.queueWorkItem(createWorkItem)
            }
            WORK -> {
                val workWorkItem = WorkWorkItem(metaDataModel!!)
                workProcess!!.queueWorkItem(workWorkItem)
            }
            CLEAN -> {
                val cleanWorkItem = CleanWorkItem(metaDataModel!!)
                cleanProcess!!.queueWorkItem(cleanWorkItem)
            }
            else -> Log.e(TAG, "error of the flow, something went wrong")
        }

        queueFlowListener.onWorkItemStateChange(metaDataModel!!)
    }

    override fun onJobComplete(workItem: BaseWorkItem) {
        val currentState = workItem.metadata.state

        val nextStateIndex = sequentialStatusMap!!.indexOf(currentState)
        val nextState = sequentialStatusMap!![nextStateIndex + 1]

        onStateChanged(nextState)
    }

    override fun onJobFailed(e: Exception, workItem: BaseWorkItem) {
        workItem.metadata.statusReason = e.message
        workItem.metadata.stateBeforeFail = workItem.metadata.state
    }

    companion object {
        private val TAG : String? = QueueFlowManager::class.java.simpleName
        val SECONDS_IN_ONE_PROCESS : Long? = 5 * 1000

        private var sequentialStatusMap: List<String>? = null
    }
}
В init мы прописываем все процессы которые у нас будут и подписываемся на их коллбеки, что бы по результату получать какой-то фидбек и стартовать новый процесс.
— makeSequentialStatusMap — мы создаем список в который задаем все процессы которые мы будем запускать. 
— onStateChanged — изменяет текущий стейт в модели которая у нас является текущей для работы с очередью. В нем же по порядку, в зависимости от полученного стейта запускается нужный процесс. Далее вызывается onWorkItemStateChange для отправки колбека о текущем стейте в активити.
— onJobComplete — мы получаем текущий стейт, увеличиваем его индекс и запускаем следующий стейт в работу.
— onJobFailed — записываем ошибки с которой упала очередь и стейт на котором оно упало.
— startFlow — запускает флоу с последнего который ему задали в onStateChange.

activity_main.xml
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
        xmlns:android="http://schemas.android.com/apk/res/android"
        xmlns:tools="http://schemas.android.com/tools"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        tools:context=".MainActivity"
        android:gravity="center"
        android:orientation="vertical">

    <TextView
            android:layout_width="wrap_content"
            android:layout_height="wrap_content" 
            android:id="@+id/timer"
            android:textColor="@android:color/black"/>
    <TextView
            android:id="@+id/text"
            android:layout_width="wrap_content"
            android:layout_height="wrap_content"
            android:textColor="@android:color/black"
            android:textSize="24sp"/>
    <Button
            android:text="Start"
            android:layout_width="wrap_content"
            android:layout_height="wrap_content"
            android:id="@+id/startButton"/>

</LinearLayout>
Вот так будет выглядеть наш леяут, на нем будет три элемента, два текстовых поля и одна кнопка, по нажатию на которую мы будем стартовать нашу очередь.

MainActivity.kt
import android.os.Bundle
import android.os.CountDownTimer
import android.view.View
import androidx.appcompat.app.AppCompatActivity
import dajver.com.taskqueueexample.models.MetaDataModel
import dajver.com.taskqueueexample.queue.enums.Statuses
import dajver.com.taskqueueexample.queue.flow.QueueFlowListener
import dajver.com.taskqueueexample.queue.flow.QueueFlowManager
import kotlinx.android.synthetic.main.activity_main.*
import java.util.concurrent.TimeUnit

class MainActivity : AppCompatActivity(), QueueFlowListener {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        startButton.setOnClickListener {
            QueueFlowManager(this).startFlow()
        }
    }

    private val countDownTimer = object : CountDownTimer(6000, 1000) {
        override fun onTick(millisUntilFinished: Long) {
            val convertedTime = TimeUnit.MILLISECONDS.toSeconds(millisUntilFinished).toString()
            timer.text =  "Time left to next state: $convertedTime"
        }

        override fun onFinish() { }
    }

    override fun onWorkItemStateChange(metaDataModel: MetaDataModel) {
        text.text = "Current state of queue: ${metaDataModel.state}"

        startButton.visibility = if(metaDataModel.state == Statuses.CLEAN) View.VISIBLE else View.INVISIBLE
        if(metaDataModel.state != Statuses.CLEAN) {
            countDownTimer.start()
        }
    }
}
Из самого важного что я могу отметить в данной активити — это onCreate в котором по нажатию на кнопку мы стартуем очередь. В onWorkItemStateChange мы отображаем стейт и рестартуем таймер каждый раз когда меняется стейт процесса. countDownTimer — нужен чисто для визуального понимания через сколько запустится новый процесс в очереди. Ну и в onTick в countDownTimer'e мы обновляем текстовое поле которое касается таймера.

Вот такая штука эта очередь, надеюсь кому-то это будет полезно и когда нибудь пригодится, я лично с этим столкнулся и долго мучался что бы правильно это реализовать, как практика показала такой способ очень удобный и стабильный, очередь работает как часы, добавление нового элемента в очередь не составит никакого труда так как архитектура построена на принципе SOLID — каждый класс ответственнен за выполнение одного действия.

Исходники:
GitHub

Комментариев нет:

Отправить комментарий