mirror of
https://github.com/CHOMPStation2/CHOMPStation2.git
synced 2025-12-13 11:43:31 +00:00
initial commit for process scheduler
This commit is contained in:
17
code/controllers/ProcessScheduler/core/_define.dm
Normal file
17
code/controllers/ProcessScheduler/core/_define.dm
Normal file
@@ -0,0 +1,17 @@
|
||||
// Process status defines
|
||||
#define PROCESS_STATUS_IDLE 1
|
||||
#define PROCESS_STATUS_QUEUED 2
|
||||
#define PROCESS_STATUS_RUNNING 3
|
||||
#define PROCESS_STATUS_MAYBE_HUNG 4
|
||||
#define PROCESS_STATUS_PROBABLY_HUNG 5
|
||||
#define PROCESS_STATUS_HUNG 6
|
||||
|
||||
// Process time thresholds
|
||||
#define PROCESS_DEFAULT_HANG_WARNING_TIME 300 // 30 seconds
|
||||
#define PROCESS_DEFAULT_HANG_ALERT_TIME 600 // 60 seconds
|
||||
#define PROCESS_DEFAULT_HANG_RESTART_TIME 900 // 90 seconds
|
||||
#define PROCESS_DEFAULT_SCHEDULE_INTERVAL 50 // 50 ticks
|
||||
#define PROCESS_DEFAULT_SLEEP_INTERVAL 2 // 2 ticks
|
||||
#define PROCESS_DEFAULT_CPU_THRESHOLD 90 // 90%
|
||||
|
||||
//#define UPDATE_QUEUE_DEBUG
|
||||
38
code/controllers/ProcessScheduler/core/_stubs.dm
Normal file
38
code/controllers/ProcessScheduler/core/_stubs.dm
Normal file
@@ -0,0 +1,38 @@
|
||||
/**
|
||||
* _stubs.dm
|
||||
*
|
||||
* This file contains constructs that the process scheduler expects to exist
|
||||
* in a standard ss13 fork.
|
||||
*/
|
||||
/*
|
||||
/**
|
||||
* message_admins
|
||||
*
|
||||
* sends a message to admins
|
||||
*/
|
||||
/proc/message_admins(msg)
|
||||
world << msg
|
||||
*/
|
||||
/**
|
||||
* logTheThing
|
||||
*
|
||||
* In goonstation, this proc writes a message to either the world log or diary.
|
||||
*
|
||||
* Blame Keelin.
|
||||
*/
|
||||
/proc/logTheThing(type, source, target, text, diaryType)
|
||||
if(diaryType)
|
||||
world << "Diary: \[[diaryType]:[type]] [text]"
|
||||
else
|
||||
world << "Log: \[[type]] [text]"
|
||||
|
||||
/**
|
||||
* var/disposed
|
||||
*
|
||||
* In goonstation, disposed is set to 1 after an object enters the delete queue
|
||||
* or the object is placed in an object pool (effectively out-of-play so to speak)
|
||||
*/
|
||||
/datum/var/disposed
|
||||
// Garbage collection (controller).
|
||||
/datum/var/gcDestroyed
|
||||
/datum/var/timeDestroyed
|
||||
317
code/controllers/ProcessScheduler/core/process.dm
Normal file
317
code/controllers/ProcessScheduler/core/process.dm
Normal file
@@ -0,0 +1,317 @@
|
||||
// Process
|
||||
|
||||
/datum/controller/process
|
||||
/**
|
||||
* State vars
|
||||
*/
|
||||
// Main controller ref
|
||||
var/tmp/datum/controller/processScheduler/main
|
||||
|
||||
// 1 if process is not running or queued
|
||||
var/tmp/idle = 1
|
||||
|
||||
// 1 if process is queued
|
||||
var/tmp/queued = 0
|
||||
|
||||
// 1 if process is running
|
||||
var/tmp/running = 0
|
||||
|
||||
// 1 if process is blocked up
|
||||
var/tmp/hung = 0
|
||||
|
||||
// 1 if process was killed
|
||||
var/tmp/killed = 0
|
||||
|
||||
// Status text var
|
||||
var/tmp/status
|
||||
|
||||
// Previous status text var
|
||||
var/tmp/previousStatus
|
||||
|
||||
// 1 if process is disabled
|
||||
var/tmp/disabled = 0
|
||||
|
||||
/**
|
||||
* Config vars
|
||||
*/
|
||||
// Process name
|
||||
var/name
|
||||
|
||||
// Process schedule interval
|
||||
// This controls how often the process would run under ideal conditions.
|
||||
// If the process scheduler sees that the process has finished, it will wait until
|
||||
// this amount of time has elapsed from the start of the previous run to start the
|
||||
// process running again.
|
||||
var/tmp/schedule_interval = PROCESS_DEFAULT_SCHEDULE_INTERVAL // run every 50 ticks
|
||||
|
||||
// Process sleep interval
|
||||
// This controls how often the process will yield (call sleep(0)) while it is running.
|
||||
// Every concurrent process should sleep periodically while running in order to allow other
|
||||
// processes to execute concurrently.
|
||||
var/tmp/sleep_interval = PROCESS_DEFAULT_SLEEP_INTERVAL
|
||||
|
||||
// hang_warning_time - this is the time (in 1/10 seconds) after which the server will begin to show "maybe hung" in the context window
|
||||
var/tmp/hang_warning_time = PROCESS_DEFAULT_HANG_WARNING_TIME
|
||||
|
||||
// hang_alert_time - After this much time(in 1/10 seconds), the server will send an admin debug message saying the process may be hung
|
||||
var/tmp/hang_alert_time = PROCESS_DEFAULT_HANG_ALERT_TIME
|
||||
|
||||
// hang_restart_time - After this much time(in 1/10 seconds), the server will automatically kill and restart the process.
|
||||
var/tmp/hang_restart_time = PROCESS_DEFAULT_HANG_RESTART_TIME
|
||||
|
||||
// cpu_threshold - if world.cpu >= cpu_threshold, scheck() will call sleep(1) to defer further work until the next tick. This keeps a process from driving a tick into overtime (causing perceptible lag)
|
||||
var/tmp/cpu_threshold = PROCESS_DEFAULT_CPU_THRESHOLD
|
||||
|
||||
// How many times in the current run has the process deferred work till the next tick?
|
||||
var/tmp/cpu_defer_count = 0
|
||||
|
||||
/**
|
||||
* recordkeeping vars
|
||||
*/
|
||||
|
||||
// Records the time (server ticks) at which the process last finished sleeping
|
||||
var/tmp/last_slept = 0
|
||||
|
||||
// Records the time (s-ticks) at which the process last began running
|
||||
var/tmp/run_start = 0
|
||||
|
||||
// Records the number of times this process has been killed and restarted
|
||||
var/tmp/times_killed
|
||||
|
||||
// Tick count
|
||||
var/tmp/ticks = 0
|
||||
|
||||
var/tmp/last_task = ""
|
||||
|
||||
var/tmp/last_object
|
||||
|
||||
datum/controller/process/New(var/datum/controller/processScheduler/scheduler)
|
||||
..()
|
||||
main = scheduler
|
||||
previousStatus = "idle"
|
||||
idle()
|
||||
name = "process"
|
||||
schedule_interval = 50
|
||||
sleep_interval = 2
|
||||
last_slept = 0
|
||||
run_start = 0
|
||||
ticks = 0
|
||||
last_task = 0
|
||||
last_object = null
|
||||
|
||||
datum/controller/process/proc/started()
|
||||
// Initialize last_slept so we can know when to sleep
|
||||
last_slept = world.timeofday
|
||||
|
||||
// Initialize run_start so we can detect hung processes.
|
||||
run_start = world.timeofday
|
||||
|
||||
// Initialize defer count
|
||||
cpu_defer_count = 0
|
||||
|
||||
running()
|
||||
main.processStarted(src)
|
||||
|
||||
onStart()
|
||||
|
||||
datum/controller/process/proc/finished()
|
||||
ticks++
|
||||
idle()
|
||||
main.processFinished(src)
|
||||
|
||||
onFinish()
|
||||
|
||||
datum/controller/process/proc/doWork()
|
||||
|
||||
datum/controller/process/proc/setup()
|
||||
|
||||
datum/controller/process/proc/process()
|
||||
started()
|
||||
doWork()
|
||||
finished()
|
||||
|
||||
datum/controller/process/proc/running()
|
||||
idle = 0
|
||||
queued = 0
|
||||
running = 1
|
||||
hung = 0
|
||||
setStatus(PROCESS_STATUS_RUNNING)
|
||||
|
||||
datum/controller/process/proc/idle()
|
||||
queued = 0
|
||||
running = 0
|
||||
idle = 1
|
||||
hung = 0
|
||||
setStatus(PROCESS_STATUS_IDLE)
|
||||
|
||||
datum/controller/process/proc/queued()
|
||||
idle = 0
|
||||
running = 0
|
||||
queued = 1
|
||||
hung = 0
|
||||
setStatus(PROCESS_STATUS_QUEUED)
|
||||
|
||||
datum/controller/process/proc/hung()
|
||||
hung = 1
|
||||
setStatus(PROCESS_STATUS_HUNG)
|
||||
|
||||
datum/controller/process/proc/handleHung()
|
||||
var/datum/lastObj = last_object
|
||||
var/lastObjType = "null"
|
||||
if(istype(lastObj))
|
||||
lastObjType = lastObj.type
|
||||
|
||||
// If world.timeofday has rolled over, then we need to adjust.
|
||||
if (world.timeofday < run_start)
|
||||
run_start -= 864000
|
||||
|
||||
var/msg = "[name] process hung at tick #[ticks]. Process was unresponsive for [(world.timeofday - run_start) / 10] seconds and was restarted. Last task: [last_task]. Last Object Type: [lastObjType]"
|
||||
logTheThing("debug", null, null, msg)
|
||||
logTheThing("diary", null, null, msg, "debug")
|
||||
message_admins(msg)
|
||||
|
||||
main.restartProcess(src.name)
|
||||
|
||||
datum/controller/process/proc/kill()
|
||||
if (!killed)
|
||||
var/msg = "[name] process was killed at tick #[ticks]."
|
||||
logTheThing("debug", null, null, msg)
|
||||
logTheThing("diary", null, null, msg, "debug")
|
||||
//finished()
|
||||
|
||||
// Allow inheritors to clean up if needed
|
||||
onKill()
|
||||
|
||||
killed = TRUE
|
||||
|
||||
// This should del
|
||||
del(src)
|
||||
|
||||
datum/controller/process/proc/scheck(var/tickId = 0)
|
||||
if (killed)
|
||||
// The kill proc is the only place where killed is set.
|
||||
// The kill proc should have deleted this datum, and all sleeping procs that are
|
||||
// owned by it.
|
||||
CRASH("A killed process is still running somehow...")
|
||||
|
||||
// For each tick the process defers, it increments the cpu_defer_count so we don't
|
||||
// defer indefinitely
|
||||
if (world.cpu >= cpu_threshold + cpu_defer_count * 10)
|
||||
sleep(1)
|
||||
cpu_defer_count++
|
||||
last_slept = world.timeofday
|
||||
else
|
||||
// If world.timeofday has rolled over, then we need to adjust.
|
||||
if (world.timeofday < last_slept)
|
||||
last_slept -= 864000
|
||||
|
||||
if (world.timeofday > last_slept + sleep_interval)
|
||||
// If we haven't slept in sleep_interval ticks, sleep to allow other work to proceed.
|
||||
sleep(0)
|
||||
last_slept = world.timeofday
|
||||
|
||||
datum/controller/process/proc/update()
|
||||
// Clear delta
|
||||
if(previousStatus != status)
|
||||
setStatus(status)
|
||||
|
||||
var/elapsedTime = getElapsedTime()
|
||||
|
||||
if (elapsedTime > hang_restart_time)
|
||||
hung()
|
||||
else if (elapsedTime > hang_alert_time)
|
||||
setStatus(PROCESS_STATUS_PROBABLY_HUNG)
|
||||
else if (elapsedTime > hang_warning_time)
|
||||
setStatus(PROCESS_STATUS_MAYBE_HUNG)
|
||||
|
||||
datum/controller/process/proc/getElapsedTime()
|
||||
if (world.timeofday < run_start)
|
||||
return world.timeofday - (run_start - 864000)
|
||||
return world.timeofday - run_start
|
||||
|
||||
datum/controller/process/proc/tickDetail()
|
||||
return
|
||||
|
||||
datum/controller/process/proc/getContext()
|
||||
return "<tr><td>[name]</td><td>[main.averageRunTime(src)]</td><td>[main.last_run_time[src]]</td><td>[main.highest_run_time[src]]</td><td>[ticks]</td></tr>\n"
|
||||
|
||||
datum/controller/process/proc/getContextData()
|
||||
return list(
|
||||
"name" = name,
|
||||
"averageRunTime" = main.averageRunTime(src),
|
||||
"lastRunTime" = main.last_run_time[src],
|
||||
"highestRunTime" = main.highest_run_time[src],
|
||||
"ticks" = ticks,
|
||||
"schedule" = schedule_interval,
|
||||
"status" = getStatusText(),
|
||||
"disabled" = disabled
|
||||
)
|
||||
|
||||
datum/controller/process/proc/getStatus()
|
||||
return status
|
||||
|
||||
datum/controller/process/proc/getStatusText(var/s = 0)
|
||||
if(!s)
|
||||
s = status
|
||||
switch(s)
|
||||
if(PROCESS_STATUS_IDLE)
|
||||
return "idle"
|
||||
if(PROCESS_STATUS_QUEUED)
|
||||
return "queued"
|
||||
if(PROCESS_STATUS_RUNNING)
|
||||
return "running"
|
||||
if(PROCESS_STATUS_MAYBE_HUNG)
|
||||
return "maybe hung"
|
||||
if(PROCESS_STATUS_PROBABLY_HUNG)
|
||||
return "probably hung"
|
||||
if(PROCESS_STATUS_HUNG)
|
||||
return "HUNG"
|
||||
else
|
||||
return "UNKNOWN"
|
||||
|
||||
datum/controller/process/proc/getPreviousStatus()
|
||||
return previousStatus
|
||||
|
||||
datum/controller/process/proc/getPreviousStatusText()
|
||||
return getStatusText(previousStatus)
|
||||
|
||||
datum/controller/process/proc/setStatus(var/newStatus)
|
||||
previousStatus = status
|
||||
status = newStatus
|
||||
|
||||
datum/controller/process/proc/setLastTask(var/task, var/object)
|
||||
last_task = task
|
||||
last_object = object
|
||||
|
||||
datum/controller/process/proc/_copyStateFrom(var/datum/controller/process/target)
|
||||
main = target.main
|
||||
name = target.name
|
||||
schedule_interval = target.schedule_interval
|
||||
sleep_interval = target.sleep_interval
|
||||
last_slept = 0
|
||||
run_start = 0
|
||||
times_killed = target.times_killed
|
||||
ticks = target.ticks
|
||||
last_task = target.last_task
|
||||
last_object = target.last_object
|
||||
copyStateFrom(target)
|
||||
|
||||
datum/controller/process/proc/copyStateFrom(var/datum/controller/process/target)
|
||||
|
||||
datum/controller/process/proc/onKill()
|
||||
|
||||
datum/controller/process/proc/onStart()
|
||||
|
||||
datum/controller/process/proc/onFinish()
|
||||
|
||||
datum/controller/process/proc/disable()
|
||||
disabled = 1
|
||||
|
||||
datum/controller/process/proc/enable()
|
||||
disabled = 0
|
||||
|
||||
/datum/controller/process/proc/getLastRunTime()
|
||||
return main.getProcessLastRunTime(src)
|
||||
|
||||
/datum/controller/process/proc/getTicks()
|
||||
return ticks
|
||||
320
code/controllers/ProcessScheduler/core/processScheduler.dm
Normal file
320
code/controllers/ProcessScheduler/core/processScheduler.dm
Normal file
@@ -0,0 +1,320 @@
|
||||
// Singleton instance of game_controller_new, setup in world.New()
|
||||
var/global/datum/controller/processScheduler/processScheduler
|
||||
|
||||
/datum/controller/processScheduler
|
||||
// Processes known by the scheduler
|
||||
var/tmp/datum/controller/process/list/processes = new
|
||||
|
||||
// Processes that are currently running
|
||||
var/tmp/datum/controller/process/list/running = new
|
||||
|
||||
// Processes that are idle
|
||||
var/tmp/datum/controller/process/list/idle = new
|
||||
|
||||
// Processes that are queued to run
|
||||
var/tmp/datum/controller/process/list/queued = new
|
||||
|
||||
// Process name -> process object map
|
||||
var/tmp/datum/controller/process/list/nameToProcessMap = new
|
||||
|
||||
// Process last start times
|
||||
var/tmp/datum/controller/process/list/last_start = new
|
||||
|
||||
// Process last run durations
|
||||
var/tmp/datum/controller/process/list/last_run_time = new
|
||||
|
||||
// Per process list of the last 20 durations
|
||||
var/tmp/datum/controller/process/list/last_twenty_run_times = new
|
||||
|
||||
// Process highest run time
|
||||
var/tmp/datum/controller/process/list/highest_run_time = new
|
||||
|
||||
// Sleep 1 tick -- This may be too aggressive.
|
||||
var/tmp/scheduler_sleep_interval = 1
|
||||
|
||||
// Controls whether the scheduler is running or not
|
||||
var/tmp/isRunning = 0
|
||||
|
||||
// Setup for these processes will be deferred until all the other processes are set up.
|
||||
var/tmp/list/deferredSetupList = new
|
||||
|
||||
/**
|
||||
* deferSetupFor
|
||||
* @param path processPath
|
||||
* If a process needs to be initialized after everything else, add it to
|
||||
* the deferred setup list. On goonstation, only the ticker needs to have
|
||||
* this treatment.
|
||||
*/
|
||||
/datum/controller/processScheduler/proc/deferSetupFor(var/processPath)
|
||||
if (!(processPath in deferredSetupList))
|
||||
deferredSetupList += processPath
|
||||
|
||||
/datum/controller/processScheduler/proc/setup()
|
||||
// There can be only one
|
||||
if(processScheduler && (processScheduler != src))
|
||||
del(src)
|
||||
return 0
|
||||
|
||||
var/process
|
||||
// Add all the processes we can find, except for the ticker
|
||||
for (process in typesof(/datum/controller/process) - /datum/controller/process)
|
||||
if (!(process in deferredSetupList))
|
||||
addProcess(new process(src))
|
||||
|
||||
for (process in deferredSetupList)
|
||||
addProcess(new process(src))
|
||||
|
||||
/datum/controller/processScheduler/proc/start()
|
||||
isRunning = 1
|
||||
spawn(0)
|
||||
process()
|
||||
|
||||
/datum/controller/processScheduler/proc/process()
|
||||
while(isRunning)
|
||||
checkRunningProcesses()
|
||||
queueProcesses()
|
||||
runQueuedProcesses()
|
||||
sleep(scheduler_sleep_interval)
|
||||
|
||||
/datum/controller/processScheduler/proc/stop()
|
||||
isRunning = 0
|
||||
|
||||
/datum/controller/processScheduler/proc/checkRunningProcesses()
|
||||
for(var/datum/controller/process/p in running)
|
||||
p.update()
|
||||
|
||||
if (isnull(p)) // Process was killed
|
||||
continue
|
||||
|
||||
var/status = p.getStatus()
|
||||
var/previousStatus = p.getPreviousStatus()
|
||||
|
||||
// Check status changes
|
||||
if(status != previousStatus)
|
||||
//Status changed.
|
||||
|
||||
switch(status)
|
||||
if(PROCESS_STATUS_MAYBE_HUNG)
|
||||
message_admins("Process '[p.name]' is [p.getStatusText(status)].")
|
||||
if(PROCESS_STATUS_PROBABLY_HUNG)
|
||||
message_admins("Process '[p.name]' is [p.getStatusText(status)].")
|
||||
if(PROCESS_STATUS_HUNG)
|
||||
message_admins("Process '[p.name]' is [p.getStatusText(status)].")
|
||||
p.handleHung()
|
||||
|
||||
/datum/controller/processScheduler/proc/queueProcesses()
|
||||
for(var/datum/controller/process/p in processes)
|
||||
// Don't double-queue, don't queue running processes
|
||||
if (p.disabled || p.running || p.queued || !p.idle)
|
||||
continue
|
||||
|
||||
// If world.timeofday has rolled over, then we need to adjust.
|
||||
if (world.timeofday < last_start[p])
|
||||
last_start[p] -= 864000
|
||||
|
||||
// If the process should be running by now, go ahead and queue it
|
||||
if (world.timeofday > last_start[p] + p.schedule_interval)
|
||||
setQueuedProcessState(p)
|
||||
|
||||
/datum/controller/processScheduler/proc/runQueuedProcesses()
|
||||
for(var/datum/controller/process/p in queued)
|
||||
runProcess(p)
|
||||
|
||||
/datum/controller/processScheduler/proc/addProcess(var/datum/controller/process/process)
|
||||
processes.Add(process)
|
||||
process.idle()
|
||||
idle.Add(process)
|
||||
|
||||
// init recordkeeping vars
|
||||
last_start.Add(process)
|
||||
last_start[process] = 0
|
||||
last_run_time.Add(process)
|
||||
last_run_time[process] = 0
|
||||
last_twenty_run_times.Add(process)
|
||||
last_twenty_run_times[process] = list()
|
||||
highest_run_time.Add(process)
|
||||
highest_run_time[process] = 0
|
||||
|
||||
// init starts and stops record starts
|
||||
recordStart(process, 0)
|
||||
recordEnd(process, 0)
|
||||
|
||||
// Set up process
|
||||
process.setup()
|
||||
|
||||
// Save process in the name -> process map
|
||||
nameToProcessMap[process.name] = process
|
||||
|
||||
/datum/controller/processScheduler/proc/replaceProcess(var/datum/controller/process/oldProcess, var/datum/controller/process/newProcess)
|
||||
processes.Remove(oldProcess)
|
||||
processes.Add(newProcess)
|
||||
|
||||
newProcess.idle()
|
||||
idle.Remove(oldProcess)
|
||||
running.Remove(oldProcess)
|
||||
queued.Remove(oldProcess)
|
||||
idle.Add(newProcess)
|
||||
|
||||
last_start.Remove(oldProcess)
|
||||
last_start.Add(newProcess)
|
||||
last_start[newProcess] = 0
|
||||
|
||||
last_run_time.Add(newProcess)
|
||||
last_run_time[newProcess] = last_run_time[oldProcess]
|
||||
last_run_time.Remove(oldProcess)
|
||||
|
||||
last_twenty_run_times.Add(newProcess)
|
||||
last_twenty_run_times[newProcess] = last_twenty_run_times[oldProcess]
|
||||
last_twenty_run_times.Remove(oldProcess)
|
||||
|
||||
highest_run_time.Add(newProcess)
|
||||
highest_run_time[newProcess] = highest_run_time[oldProcess]
|
||||
highest_run_time.Remove(oldProcess)
|
||||
|
||||
recordStart(newProcess, 0)
|
||||
recordEnd(newProcess, 0)
|
||||
|
||||
nameToProcessMap[newProcess.name] = newProcess
|
||||
|
||||
|
||||
/datum/controller/processScheduler/proc/runProcess(var/datum/controller/process/process)
|
||||
spawn(0)
|
||||
process.process()
|
||||
|
||||
/datum/controller/processScheduler/proc/processStarted(var/datum/controller/process/process)
|
||||
setRunningProcessState(process)
|
||||
recordStart(process)
|
||||
|
||||
/datum/controller/processScheduler/proc/processFinished(var/datum/controller/process/process)
|
||||
setIdleProcessState(process)
|
||||
recordEnd(process)
|
||||
|
||||
/datum/controller/processScheduler/proc/setIdleProcessState(var/datum/controller/process/process)
|
||||
if (process in running)
|
||||
running -= process
|
||||
if (process in queued)
|
||||
queued -= process
|
||||
if (!(process in idle))
|
||||
idle += process
|
||||
|
||||
process.idle()
|
||||
|
||||
/datum/controller/processScheduler/proc/setQueuedProcessState(var/datum/controller/process/process)
|
||||
if (process in running)
|
||||
running -= process
|
||||
if (process in idle)
|
||||
idle -= process
|
||||
if (!(process in queued))
|
||||
queued += process
|
||||
|
||||
// The other state transitions are handled internally by the process.
|
||||
process.queued()
|
||||
|
||||
/datum/controller/processScheduler/proc/setRunningProcessState(var/datum/controller/process/process)
|
||||
if (process in queued)
|
||||
queued -= process
|
||||
if (process in idle)
|
||||
idle -= process
|
||||
if (!(process in running))
|
||||
running += process
|
||||
|
||||
process.running()
|
||||
|
||||
/datum/controller/processScheduler/proc/recordStart(var/datum/controller/process/process, var/time = null)
|
||||
if (isnull(time))
|
||||
time = world.timeofday
|
||||
|
||||
last_start[process] = time
|
||||
|
||||
/datum/controller/processScheduler/proc/recordEnd(var/datum/controller/process/process, var/time = null)
|
||||
if (isnull(time))
|
||||
time = world.timeofday
|
||||
|
||||
// If world.timeofday has rolled over, then we need to adjust.
|
||||
if (time < last_start[process])
|
||||
last_start[process] -= 864000
|
||||
|
||||
var/lastRunTime = time - last_start[process]
|
||||
|
||||
if(lastRunTime < 0)
|
||||
lastRunTime = 0
|
||||
|
||||
recordRunTime(process, lastRunTime)
|
||||
|
||||
/**
|
||||
* recordRunTime
|
||||
* Records a run time for a process
|
||||
*/
|
||||
/datum/controller/processScheduler/proc/recordRunTime(var/datum/controller/process/process, time)
|
||||
last_run_time[process] = time
|
||||
if(time > highest_run_time[process])
|
||||
highest_run_time[process] = time
|
||||
|
||||
var/list/lastTwenty = last_twenty_run_times[process]
|
||||
if (lastTwenty.len == 20)
|
||||
lastTwenty.Cut(1, 2)
|
||||
lastTwenty.len++
|
||||
lastTwenty[lastTwenty.len] = time
|
||||
|
||||
/**
|
||||
* averageRunTime
|
||||
* returns the average run time (over the last 20) of the process
|
||||
*/
|
||||
/datum/controller/processScheduler/proc/averageRunTime(var/datum/controller/process/process)
|
||||
var/lastTwenty = last_twenty_run_times[process]
|
||||
|
||||
var/t = 0
|
||||
var/c = 0
|
||||
for(var/time in lastTwenty)
|
||||
t += time
|
||||
c++
|
||||
|
||||
if(c > 0)
|
||||
return t / c
|
||||
return c
|
||||
|
||||
/datum/controller/processScheduler/proc/getStatusData()
|
||||
var/list/data = new
|
||||
|
||||
for (var/datum/controller/process/p in processes)
|
||||
data.len++
|
||||
data[data.len] = p.getContextData()
|
||||
|
||||
return data
|
||||
|
||||
/datum/controller/processScheduler/proc/getProcessCount()
|
||||
return processes.len
|
||||
|
||||
/datum/controller/processScheduler/proc/hasProcess(var/processName as text)
|
||||
if (nameToProcessMap[processName])
|
||||
return 1
|
||||
|
||||
/datum/controller/processScheduler/proc/killProcess(var/processName as text)
|
||||
restartProcess(processName)
|
||||
|
||||
/datum/controller/processScheduler/proc/restartProcess(var/processName as text)
|
||||
if (hasProcess(processName))
|
||||
var/datum/controller/process/oldInstance = nameToProcessMap[processName]
|
||||
var/datum/controller/process/newInstance = new oldInstance.type(src)
|
||||
newInstance._copyStateFrom(oldInstance)
|
||||
replaceProcess(oldInstance, newInstance)
|
||||
oldInstance.kill()
|
||||
|
||||
/datum/controller/processScheduler/proc/enableProcess(var/processName as text)
|
||||
if (hasProcess(processName))
|
||||
var/datum/controller/process/process = nameToProcessMap[processName]
|
||||
process.enable()
|
||||
|
||||
/datum/controller/processScheduler/proc/disableProcess(var/processName as text)
|
||||
if (hasProcess(processName))
|
||||
var/datum/controller/process/process = nameToProcessMap[processName]
|
||||
process.disable()
|
||||
|
||||
/datum/controller/processScheduler/proc/getProcess(var/name)
|
||||
return nameToProcessMap[name]
|
||||
|
||||
/datum/controller/processScheduler/proc/getProcessLastRunTime(var/datum/controller/process/process)
|
||||
return last_run_time[process]
|
||||
|
||||
/datum/controller/processScheduler/proc/getIsRunning()
|
||||
return isRunning
|
||||
127
code/controllers/ProcessScheduler/core/updateQueue.dm
Normal file
127
code/controllers/ProcessScheduler/core/updateQueue.dm
Normal file
@@ -0,0 +1,127 @@
|
||||
/**
|
||||
* updateQueue.dm
|
||||
*/
|
||||
|
||||
#ifdef UPDATE_QUEUE_DEBUG
|
||||
#define uq_dbg(text) world << text
|
||||
#else
|
||||
#define uq_dbg(text)
|
||||
#endif
|
||||
/datum/updateQueue
|
||||
var/tmp/list/objects
|
||||
var/tmp/previousStart
|
||||
var/tmp/procName
|
||||
var/tmp/list/arguments
|
||||
var/tmp/datum/updateQueueWorker/currentWorker
|
||||
var/tmp/workerTimeout
|
||||
var/tmp/adjustedWorkerTimeout
|
||||
var/tmp/currentKillCount
|
||||
var/tmp/totalKillCount
|
||||
|
||||
/datum/updateQueue/New(list/objects = list(), procName = "update", list/arguments = list(), workerTimeout = 2, inplace = 0)
|
||||
..()
|
||||
|
||||
uq_dbg("Update queue created.")
|
||||
|
||||
// Init proc allows for recycling the worker.
|
||||
init(objects = objects, procName = procName, arguments = arguments, workerTimeout = workerTimeout, inplace = inplace)
|
||||
|
||||
/**
|
||||
* init
|
||||
* @param list objects objects to update
|
||||
* @param text procName the proc to call on each item in the object list
|
||||
* @param list arguments optional arguments to pass to the update proc
|
||||
* @param number workerTimeout number of ticks to wait for an update to
|
||||
finish before forking a new update worker
|
||||
* @param bool inplace whether the updateQueue should make a copy of objects.
|
||||
the internal list will be modified, so it is usually
|
||||
a good idea to leave this alone. Default behavior is to
|
||||
copy.
|
||||
*/
|
||||
/datum/updateQueue/proc/init(list/objects = list(), procName = "update", list/arguments = list(), workerTimeout = 2, inplace = 0)
|
||||
uq_dbg("Update queue initialization started.")
|
||||
|
||||
if (!inplace)
|
||||
// Make an internal copy of the list so we're not modifying the original.
|
||||
initList(objects)
|
||||
else
|
||||
src.objects = objects
|
||||
|
||||
// Init vars
|
||||
src.procName = procName
|
||||
src.arguments = arguments
|
||||
src.workerTimeout = workerTimeout
|
||||
|
||||
adjustedWorkerTimeout = workerTimeout
|
||||
currentKillCount = 0
|
||||
totalKillCount = 0
|
||||
|
||||
uq_dbg("Update queue initialization finished. procName = '[procName]'")
|
||||
|
||||
/datum/updateQueue/proc/initList(list/toCopy)
|
||||
/**
|
||||
* We will copy the list in reverse order, as our doWork proc
|
||||
* will access them by popping an element off the end of the list.
|
||||
* This ends up being quite a lot faster than taking elements off
|
||||
* the head of the list.
|
||||
*/
|
||||
objects = new
|
||||
|
||||
uq_dbg("Copying [toCopy.len] items for processing.")
|
||||
|
||||
for(var/i=toCopy.len,i>0,)
|
||||
objects.len++
|
||||
objects[objects.len] = toCopy[i--]
|
||||
|
||||
/datum/updateQueue/proc/Run()
|
||||
uq_dbg("Starting run...")
|
||||
|
||||
startWorker()
|
||||
while (istype(currentWorker) && !currentWorker.finished)
|
||||
sleep(2)
|
||||
checkWorker()
|
||||
|
||||
uq_dbg("UpdateQueue completed run.")
|
||||
|
||||
/datum/updateQueue/proc/checkWorker()
|
||||
if(istype(currentWorker))
|
||||
// If world.timeofday has rolled over, then we need to adjust.
|
||||
if(world.timeofday < currentWorker.lastStart)
|
||||
currentWorker.lastStart -= 864000
|
||||
|
||||
if(world.timeofday - currentWorker.lastStart > adjustedWorkerTimeout)
|
||||
// This worker is a bit slow, let's spawn a new one and kill the old one.
|
||||
uq_dbg("Current worker is lagging... starting a new one.")
|
||||
killWorker()
|
||||
startWorker()
|
||||
else // No worker!
|
||||
uq_dbg("update queue ended up without a worker... starting a new one...")
|
||||
startWorker()
|
||||
|
||||
/datum/updateQueue/proc/startWorker()
|
||||
// only run the worker if we have objects to work on
|
||||
if(objects.len)
|
||||
uq_dbg("Starting worker process.")
|
||||
|
||||
// No need to create a fresh worker if we already have one...
|
||||
if (istype(currentWorker))
|
||||
currentWorker.init(objects, procName, arguments)
|
||||
else
|
||||
currentWorker = new(objects, procName, arguments)
|
||||
currentWorker.start()
|
||||
else
|
||||
uq_dbg("Queue is empty. No worker was started.")
|
||||
currentWorker = null
|
||||
|
||||
/datum/updateQueue/proc/killWorker()
|
||||
// Kill the worker
|
||||
currentWorker.kill()
|
||||
currentWorker = null
|
||||
// After we kill a worker, yield so that if the worker's been tying up the cpu, other stuff can immediately resume
|
||||
sleep(-1)
|
||||
currentKillCount++
|
||||
totalKillCount++
|
||||
if (currentKillCount >= 3)
|
||||
uq_dbg("[currentKillCount] workers have been killed with a timeout of [adjustedWorkerTimeout]. Increasing worker timeout to compensate.")
|
||||
adjustedWorkerTimeout++
|
||||
currentKillCount = 0
|
||||
83
code/controllers/ProcessScheduler/core/updateQueueWorker.dm
Normal file
83
code/controllers/ProcessScheduler/core/updateQueueWorker.dm
Normal file
@@ -0,0 +1,83 @@
|
||||
datum/updateQueueWorker
|
||||
var/tmp/list/objects
|
||||
var/tmp/killed
|
||||
var/tmp/finished
|
||||
var/tmp/procName
|
||||
var/tmp/list/arguments
|
||||
var/tmp/lastStart
|
||||
var/tmp/cpuThreshold
|
||||
|
||||
datum/updateQueueWorker/New(var/list/objects, var/procName, var/list/arguments, var/cpuThreshold = 90)
|
||||
..()
|
||||
uq_dbg("updateQueueWorker created.")
|
||||
|
||||
init(objects, procName, arguments, cpuThreshold)
|
||||
|
||||
datum/updateQueueWorker/proc/init(var/list/objects, var/procName, var/list/arguments, var/cpuThreshold = 90)
|
||||
src.objects = objects
|
||||
src.procName = procName
|
||||
src.arguments = arguments
|
||||
src.cpuThreshold = cpuThreshold
|
||||
|
||||
killed = 0
|
||||
finished = 0
|
||||
|
||||
datum/updateQueueWorker/proc/doWork()
|
||||
// If there's nothing left to execute or we were killed, mark finished and return.
|
||||
if (!objects || !objects.len) return finished()
|
||||
|
||||
lastStart = world.timeofday // Absolute number of ticks since the world started up
|
||||
|
||||
var/datum/object = objects[objects.len] // Pull out the object
|
||||
objects.len-- // Remove the object from the list
|
||||
|
||||
if (istype(object) && !isturf(object) && !object.disposed && isnull(object.gcDestroyed)) // We only work with real objects
|
||||
call(object, procName)(arglist(arguments))
|
||||
|
||||
// If there's nothing left to execute
|
||||
// or we were killed while running the above code, mark finished and return.
|
||||
if (!objects || !objects.len) return finished()
|
||||
|
||||
if (world.cpu > cpuThreshold)
|
||||
// We don't want to force a tick into overtime!
|
||||
// If the tick is about to go overtime, spawn the next update to go
|
||||
// in the next tick.
|
||||
uq_dbg("tick went into overtime with world.cpu = [world.cpu], deferred next update to next tick [1+(world.time / world.tick_lag)]")
|
||||
|
||||
spawn(1)
|
||||
doWork()
|
||||
else
|
||||
spawn(0) // Execute anonymous function immediately as if we were in a while loop...
|
||||
doWork()
|
||||
|
||||
datum/updateQueueWorker/proc/finished()
|
||||
uq_dbg("updateQueueWorker finished.")
|
||||
/**
|
||||
* If the worker was killed while it was working on something, it
|
||||
* should delete itself when it finally finishes working on it.
|
||||
* Meanwhile, the updateQueue will have proceeded on with the rest of
|
||||
* the queue. This will also terminate the spawned function that was
|
||||
* created in the kill() proc.
|
||||
*/
|
||||
if(killed)
|
||||
del(src)
|
||||
|
||||
finished = 1
|
||||
|
||||
datum/updateQueueWorker/proc/kill()
|
||||
uq_dbg("updateQueueWorker killed.")
|
||||
killed = 1
|
||||
objects = null
|
||||
|
||||
/**
|
||||
* If the worker is not done in 30 seconds after it's killed,
|
||||
* we'll forcibly delete it, causing the anonymous function it was
|
||||
* running to be terminated. Hasta la vista, baby.
|
||||
*/
|
||||
spawn(300)
|
||||
del(src)
|
||||
|
||||
datum/updateQueueWorker/proc/start()
|
||||
uq_dbg("updateQueueWorker started.")
|
||||
spawn(0)
|
||||
doWork()
|
||||
Reference in New Issue
Block a user