processScheduler Update to 510 (#398)

Updates code to be 510 compile compatible.

Also introduces the new updates to the GOON processScheduler, which should make for better gameplay and less lag. Specially on high population.
This commit is contained in:
skull132
2016-06-23 02:47:12 +03:00
committed by GitHub
parent 6dfd6bf3a0
commit 02b42862c1
47 changed files with 725 additions and 1303 deletions

View File

@@ -1,17 +0,0 @@
// Process status defines
#define PROCESS_STATUS_IDLE 1
#define PROCESS_STATUS_QUEUED 2
#define PROCESS_STATUS_RUNNING 3
#define PROCESS_STATUS_MAYBE_HUNG 4
#define PROCESS_STATUS_PROBABLY_HUNG 5
#define PROCESS_STATUS_HUNG 6
// Process time thresholds
#define PROCESS_DEFAULT_HANG_WARNING_TIME 300 // 30 seconds
#define PROCESS_DEFAULT_HANG_ALERT_TIME 600 // 60 seconds
#define PROCESS_DEFAULT_HANG_RESTART_TIME 900 // 90 seconds
#define PROCESS_DEFAULT_SCHEDULE_INTERVAL 50 // 50 ticks
#define PROCESS_DEFAULT_SLEEP_INTERVAL 2 // 2 ticks
#define PROCESS_DEFAULT_CPU_THRESHOLD 90 // 90%
//#define UPDATE_QUEUE_DEBUG

View File

@@ -4,15 +4,7 @@
* This file contains constructs that the process scheduler expects to exist
* in a standard ss13 fork.
*/
/*
/**
* message_admins
*
* sends a message to admins
*/
/proc/message_admins(msg)
world << msg
*/
/**
* logTheThing
*
@@ -25,14 +17,3 @@
world << "Diary: \[[diaryType]:[type]] [text]"
else
world << "Log: \[[type]] [text]"
/**
* var/disposed
*
* In goonstation, disposed is set to 1 after an object enters the delete queue
* or the object is placed in an object pool (effectively out-of-play so to speak)
*/
/datum/var/disposed
// Garbage collection (controller).
/datum/var/gcDestroyed
/datum/var/timeDestroyed

View File

@@ -44,11 +44,10 @@
// process running again.
var/tmp/schedule_interval = PROCESS_DEFAULT_SCHEDULE_INTERVAL // run every 50 ticks
// Process sleep interval
// This controls how often the process will yield (call sleep(0)) while it is running.
// Every concurrent process should sleep periodically while running in order to allow other
// processes to execute concurrently.
var/tmp/sleep_interval = PROCESS_DEFAULT_SLEEP_INTERVAL
// Process tick allowance
// This controls what percentage a single tick (0 to 100) the process should be.
// allowed to run before sleeping.
var/tmp/tick_allowance = PROCESS_DEFAULT_TICK_ALLOWANCE
// hang_warning_time - this is the time (in 1/10 seconds) after which the server will begin to show "maybe hung" in the context window
var/tmp/hang_warning_time = PROCESS_DEFAULT_HANG_WARNING_TIME
@@ -59,22 +58,25 @@
// hang_restart_time - After this much time(in 1/10 seconds), the server will automatically kill and restart the process.
var/tmp/hang_restart_time = PROCESS_DEFAULT_HANG_RESTART_TIME
// cpu_threshold - if world.cpu >= cpu_threshold, scheck() will call sleep(1) to defer further work until the next tick. This keeps a process from driving a tick into overtime (causing perceptible lag)
var/tmp/cpu_threshold = PROCESS_DEFAULT_CPU_THRESHOLD
// How many times in the current run has the process deferred work till the next tick?
var/tmp/cpu_defer_count = 0
// How many SCHECKs have been skipped (to limit btime calls)
var/tmp/calls_since_last_scheck = 0
/**
* recordkeeping vars
*/
// Records the time (server ticks) at which the process last finished sleeping
// Records the time (1/10s timeofday) at which the process last finished sleeping
var/tmp/last_slept = 0
// Records the time (s-ticks) at which the process last began running
// Records the time (1/10s timeofday) at which the process last began running
var/tmp/run_start = 0
// Records the world.tick_usage (0 to 100) at which the process last began running
var/tmp/tick_start = 0
// Records the number of times this process has been killed and restarted
var/tmp/times_killed
@@ -85,26 +87,36 @@
var/tmp/last_object
datum/controller/process/New(var/datum/controller/processScheduler/scheduler)
// Counts the number of times an exception has occurred; gets reset after 10
var/tmp/list/exceptions = list()
// Number of deciseconds to delay before starting the process
var/start_delay = 0
/datum/controller/process/New(var/datum/controller/processScheduler/scheduler)
..()
main = scheduler
previousStatus = "idle"
idle()
name = "process"
schedule_interval = 50
sleep_interval = 2
last_slept = 0
run_start = 0
tick_start = 0
ticks = 0
last_task = 0
last_object = null
datum/controller/process/proc/started()
// Initialize last_slept so we can know when to sleep
last_slept = world.timeofday
/datum/controller/process/proc/started()
var/timeofhour = TimeOfHour
// Initialize last_slept so we can record timing information
last_slept = timeofhour
// Initialize run_start so we can detect hung processes.
run_start = world.timeofday
run_start = timeofhour
// Initialize tick_start so we can know when to sleep
tick_start = world.tick_usage
// Initialize defer count
cpu_defer_count = 0
@@ -114,65 +126,65 @@ datum/controller/process/proc/started()
onStart()
datum/controller/process/proc/finished()
/datum/controller/process/proc/finished()
ticks++
idle()
main.processFinished(src)
onFinish()
datum/controller/process/proc/doWork()
/datum/controller/process/proc/doWork()
datum/controller/process/proc/setup()
/datum/controller/process/proc/setup()
datum/controller/process/proc/process()
/datum/controller/process/proc/process()
started()
doWork()
finished()
datum/controller/process/proc/running()
/datum/controller/process/proc/running()
idle = 0
queued = 0
running = 1
hung = 0
setStatus(PROCESS_STATUS_RUNNING)
datum/controller/process/proc/idle()
/datum/controller/process/proc/idle()
queued = 0
running = 0
idle = 1
hung = 0
setStatus(PROCESS_STATUS_IDLE)
datum/controller/process/proc/queued()
/datum/controller/process/proc/queued()
idle = 0
running = 0
queued = 1
hung = 0
setStatus(PROCESS_STATUS_QUEUED)
datum/controller/process/proc/hung()
/datum/controller/process/proc/hung()
hung = 1
setStatus(PROCESS_STATUS_HUNG)
datum/controller/process/proc/handleHung()
/datum/controller/process/proc/handleHung()
var/timeofhour = TimeOfHour
var/datum/lastObj = last_object
var/lastObjType = "null"
if(istype(lastObj))
lastObjType = lastObj.type
// If world.timeofday has rolled over, then we need to adjust.
if (world.timeofday < run_start)
run_start -= 864000
var/msg = "[name] process hung at tick #[ticks]. Process was unresponsive for [(world.timeofday - run_start) / 10] seconds and was restarted. Last task: [last_task]. Last Object Type: [lastObjType]"
// If timeofhour has rolled over, then we need to adjust.
if (timeofhour < run_start)
run_start -= 36000
var/msg = "[name] process hung at tick #[ticks]. Process was unresponsive for [(timeofhour - run_start) / 10] seconds and was restarted. Last task: [last_task]. Last Object Type: [lastObjType]"
logTheThing("debug", null, null, msg)
logTheThing("diary", null, null, msg, "debug")
message_admins(msg)
main.restartProcess(src.name)
datum/controller/process/proc/kill()
/datum/controller/process/proc/kill()
if (!killed)
var/msg = "[name] process was killed at tick #[ticks]."
logTheThing("debug", null, null, msg)
@@ -182,59 +194,63 @@ datum/controller/process/proc/kill()
// Allow inheritors to clean up if needed
onKill()
killed = TRUE
// This should del
del(src)
del(src) // This should del
datum/controller/process/proc/scheck(var/tickId = 0)
// Do not call this directly - use SHECK or SCHECK_EVERY
/datum/controller/process/proc/sleepCheck(var/tickId = 0)
calls_since_last_scheck = 0
if (killed)
// The kill proc is the only place where killed is set.
// The kill proc should have deleted this datum, and all sleeping procs that are
// owned by it.
CRASH("A killed process is still running somehow...")
if (hung)
// This will only really help if the doWork proc ends up in an infinite loop.
handleHung()
CRASH("Process [name] hung and was restarted.")
// For each tick the process defers, it increments the cpu_defer_count so we don't
// defer indefinitely
if (world.cpu >= cpu_threshold + cpu_defer_count * 10)
sleep(1)
if (world.tick_usage > 100 || (world.tick_usage - tick_start) > tick_allowance)
sleep(world.tick_lag)
cpu_defer_count++
last_slept = world.timeofday
else
// If world.timeofday has rolled over, then we need to adjust.
if (world.timeofday < last_slept)
last_slept -= 864000
last_slept = TimeOfHour
tick_start = world.tick_usage
if (world.timeofday > last_slept + sleep_interval)
// If we haven't slept in sleep_interval ticks, sleep to allow other work to proceed.
sleep(0)
last_slept = world.timeofday
return 1
datum/controller/process/proc/update()
return 0
/datum/controller/process/proc/update()
// Clear delta
if(previousStatus != status)
setStatus(status)
var/elapsedTime = getElapsedTime()
if (elapsedTime > hang_restart_time)
if (hung)
handleHung()
return
else if (elapsedTime > hang_restart_time)
hung()
else if (elapsedTime > hang_alert_time)
setStatus(PROCESS_STATUS_PROBABLY_HUNG)
else if (elapsedTime > hang_warning_time)
setStatus(PROCESS_STATUS_MAYBE_HUNG)
datum/controller/process/proc/getElapsedTime()
if (world.timeofday < run_start)
return world.timeofday - (run_start - 864000)
return world.timeofday - run_start
datum/controller/process/proc/tickDetail()
/datum/controller/process/proc/getElapsedTime()
var/timeofhour = TimeOfHour
if (timeofhour < run_start)
return timeofhour - (run_start - 36000)
return timeofhour - run_start
/datum/controller/process/proc/tickDetail()
return
datum/controller/process/proc/getContext()
/datum/controller/process/proc/getContext()
return "<tr><td>[name]</td><td>[main.averageRunTime(src)]</td><td>[main.last_run_time[src]]</td><td>[main.highest_run_time[src]]</td><td>[ticks]</td></tr>\n"
datum/controller/process/proc/getContextData()
/datum/controller/process/proc/getContextData()
return list(
"name" = name,
"averageRunTime" = main.averageRunTime(src),
@@ -246,10 +262,10 @@ datum/controller/process/proc/getContextData()
"disabled" = disabled
)
datum/controller/process/proc/getStatus()
/datum/controller/process/proc/getStatus()
return status
datum/controller/process/proc/getStatusText(var/s = 0)
/datum/controller/process/proc/getStatusText(var/s = 0)
if(!s)
s = status
switch(s)
@@ -268,55 +284,89 @@ datum/controller/process/proc/getStatusText(var/s = 0)
else
return "UNKNOWN (?)"
datum/controller/process/proc/getPreviousStatus()
/datum/controller/process/proc/getPreviousStatus()
return previousStatus
datum/controller/process/proc/getPreviousStatusText()
/datum/controller/process/proc/getPreviousStatusText()
return getStatusText(previousStatus)
datum/controller/process/proc/setStatus(var/newStatus)
/datum/controller/process/proc/setStatus(var/newStatus)
previousStatus = status
status = newStatus
datum/controller/process/proc/setLastTask(var/task, var/object)
/datum/controller/process/proc/setLastTask(var/task, var/object)
last_task = task
last_object = object
datum/controller/process/proc/_copyStateFrom(var/datum/controller/process/target)
/datum/controller/process/proc/_copyStateFrom(var/datum/controller/process/target)
main = target.main
name = target.name
schedule_interval = target.schedule_interval
sleep_interval = target.sleep_interval
last_slept = 0
run_start = 0
tick_start = 0
times_killed = target.times_killed
ticks = target.ticks
last_task = target.last_task
last_object = target.last_object
copyStateFrom(target)
datum/controller/process/proc/copyStateFrom(var/datum/controller/process/target)
/datum/controller/process/proc/copyStateFrom(var/datum/controller/process/target)
datum/controller/process/proc/onKill()
/datum/controller/process/proc/onKill()
datum/controller/process/proc/onStart()
/datum/controller/process/proc/onStart()
datum/controller/process/proc/onFinish()
/datum/controller/process/proc/onFinish()
datum/controller/process/proc/disable()
/datum/controller/process/proc/disable()
disabled = 1
datum/controller/process/proc/enable()
/datum/controller/process/proc/enable()
disabled = 0
/datum/controller/process/proc/getAverageRunTime()
return main.averageRunTime(src)
/datum/controller/process/proc/getLastRunTime()
return main.getProcessLastRunTime(src)
/datum/controller/process/proc/getHighestRunTime()
return main.getProcessHighestRunTime(src)
/datum/controller/process/proc/getTicks()
return ticks
/datum/controller/process/proc/getStatName()
return name
/datum/controller/process/proc/statProcess()
var/averageRunTime = round(getAverageRunTime(), 0.1)/10
var/lastRunTime = round(getLastRunTime(), 0.1)/10
var/highestRunTime = round(getHighestRunTime(), 0.1)/10
stat("[name]", "T#[getTicks()] | AR [averageRunTime] | LR [lastRunTime] | HR [highestRunTime] | D [cpu_defer_count]")
/datum/controller/process/proc/getTickTime()
return "#[getTicks()]\t- [getLastRunTime()]"
/datum/controller/process/proc/catchException(var/exception/e, var/thrower)
var/etext = "[e]"
var/eid = "[e]" // Exception ID, for tracking repeated exceptions
var/ptext = "" // "processing..." text, for what was being processed (if known)
if(istype(e))
etext += " in [e.file], line [e.line]"
eid = "[e.file]:[e.line]"
if(eid in exceptions)
if(exceptions[eid]++ >= 10)
return
else
exceptions[eid] = 1
if(istype(thrower, /datum))
var/datum/D = thrower
ptext = " processing [D.type]"
if(istype(thrower, /atom))
var/atom/A = thrower
ptext += " ([A]) ([A.x],[A.y],[A.z])"
log_to_dd("\[[time_stamp()]\] Process [name] caught exception[ptext]: [etext]")
if(exceptions[eid] >= 10)
log_to_dd("This exception will now be ignored for ten minutes.")
spawn(6000)
exceptions[eid] = 0
/datum/controller/process/proc/catchBadType(var/datum/caught)
if(isnull(caught) || !istype(caught) || !isnull(caught.gcDestroyed))
return // Only bother with types we can identify and that don't belong
catchException("Type [caught.type] does not belong in process' queue")

View File

@@ -1,320 +1,345 @@
// Singleton instance of game_controller_new, setup in world.New()
var/global/datum/controller/processScheduler/processScheduler
/datum/controller/processScheduler
// Processes known by the scheduler
var/tmp/datum/controller/process/list/processes = new
// Processes that are currently running
var/tmp/datum/controller/process/list/running = new
// Processes that are idle
var/tmp/datum/controller/process/list/idle = new
// Processes that are queued to run
var/tmp/datum/controller/process/list/queued = new
// Process name -> process object map
var/tmp/datum/controller/process/list/nameToProcessMap = new
// Process last start times
var/tmp/datum/controller/process/list/last_start = new
// Process last run durations
var/tmp/datum/controller/process/list/last_run_time = new
// Per process list of the last 20 durations
var/tmp/datum/controller/process/list/last_twenty_run_times = new
// Process highest run time
var/tmp/datum/controller/process/list/highest_run_time = new
// Sleep 1 tick -- This may be too aggressive.
var/tmp/scheduler_sleep_interval = 1
// Controls whether the scheduler is running or not
var/tmp/isRunning = 0
// Setup for these processes will be deferred until all the other processes are set up.
var/tmp/list/deferredSetupList = new
/**
* deferSetupFor
* @param path processPath
* If a process needs to be initialized after everything else, add it to
* the deferred setup list. On goonstation, only the ticker needs to have
* this treatment.
*/
/datum/controller/processScheduler/proc/deferSetupFor(var/processPath)
if (!(processPath in deferredSetupList))
deferredSetupList += processPath
/datum/controller/processScheduler/proc/setup()
// There can be only one
if(processScheduler && (processScheduler != src))
del(src)
return 0
var/process
// Add all the processes we can find, except for the ticker
for (process in typesof(/datum/controller/process) - /datum/controller/process)
if (!(process in deferredSetupList))
addProcess(new process(src))
for (process in deferredSetupList)
addProcess(new process(src))
/datum/controller/processScheduler/proc/start()
isRunning = 1
spawn(0)
process()
/datum/controller/processScheduler/proc/process()
while(isRunning)
checkRunningProcesses()
queueProcesses()
runQueuedProcesses()
sleep(scheduler_sleep_interval)
/datum/controller/processScheduler/proc/stop()
isRunning = 0
/datum/controller/processScheduler/proc/checkRunningProcesses()
for(var/datum/controller/process/p in running)
p.update()
if (isnull(p)) // Process was killed
continue
var/status = p.getStatus()
var/previousStatus = p.getPreviousStatus()
// Check status changes
if(status != previousStatus)
//Status changed.
switch(status)
if(PROCESS_STATUS_MAYBE_HUNG)
message_admins("Process '[p.name]' is [p.getStatusText(status)].")
if(PROCESS_STATUS_PROBABLY_HUNG)
message_admins("Process '[p.name]' is [p.getStatusText(status)].")
if(PROCESS_STATUS_HUNG)
message_admins("Process '[p.name]' is [p.getStatusText(status)].")
p.handleHung()
/datum/controller/processScheduler/proc/queueProcesses()
for(var/datum/controller/process/p in processes)
// Don't double-queue, don't queue running processes
if (p.disabled || p.running || p.queued || !p.idle)
continue
// If world.timeofday has rolled over, then we need to adjust.
if (world.timeofday < last_start[p])
last_start[p] -= 864000
// If the process should be running by now, go ahead and queue it
if (world.timeofday > last_start[p] + p.schedule_interval)
setQueuedProcessState(p)
/datum/controller/processScheduler/proc/runQueuedProcesses()
for(var/datum/controller/process/p in queued)
runProcess(p)
/datum/controller/processScheduler/proc/addProcess(var/datum/controller/process/process)
processes.Add(process)
process.idle()
idle.Add(process)
// init recordkeeping vars
last_start.Add(process)
last_start[process] = 0
last_run_time.Add(process)
last_run_time[process] = 0
last_twenty_run_times.Add(process)
last_twenty_run_times[process] = list()
highest_run_time.Add(process)
highest_run_time[process] = 0
// init starts and stops record starts
recordStart(process, 0)
recordEnd(process, 0)
// Set up process
process.setup()
// Save process in the name -> process map
nameToProcessMap[process.name] = process
/datum/controller/processScheduler/proc/replaceProcess(var/datum/controller/process/oldProcess, var/datum/controller/process/newProcess)
processes.Remove(oldProcess)
processes.Add(newProcess)
newProcess.idle()
idle.Remove(oldProcess)
running.Remove(oldProcess)
queued.Remove(oldProcess)
idle.Add(newProcess)
last_start.Remove(oldProcess)
last_start.Add(newProcess)
last_start[newProcess] = 0
last_run_time.Add(newProcess)
last_run_time[newProcess] = last_run_time[oldProcess]
last_run_time.Remove(oldProcess)
last_twenty_run_times.Add(newProcess)
last_twenty_run_times[newProcess] = last_twenty_run_times[oldProcess]
last_twenty_run_times.Remove(oldProcess)
highest_run_time.Add(newProcess)
highest_run_time[newProcess] = highest_run_time[oldProcess]
highest_run_time.Remove(oldProcess)
recordStart(newProcess, 0)
recordEnd(newProcess, 0)
nameToProcessMap[newProcess.name] = newProcess
/datum/controller/processScheduler/proc/runProcess(var/datum/controller/process/process)
spawn(0)
process.process()
/datum/controller/processScheduler/proc/processStarted(var/datum/controller/process/process)
setRunningProcessState(process)
recordStart(process)
/datum/controller/processScheduler/proc/processFinished(var/datum/controller/process/process)
setIdleProcessState(process)
recordEnd(process)
/datum/controller/processScheduler/proc/setIdleProcessState(var/datum/controller/process/process)
if (process in running)
running -= process
if (process in queued)
queued -= process
if (!(process in idle))
idle += process
process.idle()
/datum/controller/processScheduler/proc/setQueuedProcessState(var/datum/controller/process/process)
if (process in running)
running -= process
if (process in idle)
idle -= process
if (!(process in queued))
queued += process
// The other state transitions are handled internally by the process.
process.queued()
/datum/controller/processScheduler/proc/setRunningProcessState(var/datum/controller/process/process)
if (process in queued)
queued -= process
if (process in idle)
idle -= process
if (!(process in running))
running += process
process.running()
/datum/controller/processScheduler/proc/recordStart(var/datum/controller/process/process, var/time = null)
if (isnull(time))
time = world.timeofday
last_start[process] = time
/datum/controller/processScheduler/proc/recordEnd(var/datum/controller/process/process, var/time = null)
if (isnull(time))
time = world.timeofday
// If world.timeofday has rolled over, then we need to adjust.
if (time < last_start[process])
last_start[process] -= 864000
var/lastRunTime = time - last_start[process]
if(lastRunTime < 0)
lastRunTime = 0
recordRunTime(process, lastRunTime)
/**
* recordRunTime
* Records a run time for a process
*/
/datum/controller/processScheduler/proc/recordRunTime(var/datum/controller/process/process, time)
last_run_time[process] = time
if(time > highest_run_time[process])
highest_run_time[process] = time
var/list/lastTwenty = last_twenty_run_times[process]
if (lastTwenty.len == 20)
lastTwenty.Cut(1, 2)
lastTwenty.len++
lastTwenty[lastTwenty.len] = time
/**
* averageRunTime
* returns the average run time (over the last 20) of the process
*/
/datum/controller/processScheduler/proc/averageRunTime(var/datum/controller/process/process)
var/lastTwenty = last_twenty_run_times[process]
var/t = 0
var/c = 0
for(var/time in lastTwenty)
t += time
c++
if(c > 0)
return t / c
return c
/datum/controller/processScheduler/proc/getStatusData()
var/list/data = new
for (var/datum/controller/process/p in processes)
data.len++
data[data.len] = p.getContextData()
return data
/datum/controller/processScheduler/proc/getProcessCount()
return processes.len
/datum/controller/processScheduler/proc/hasProcess(var/processName as text)
if (nameToProcessMap[processName])
return 1
/datum/controller/processScheduler/proc/killProcess(var/processName as text)
restartProcess(processName)
/datum/controller/processScheduler/proc/restartProcess(var/processName as text)
if (hasProcess(processName))
var/datum/controller/process/oldInstance = nameToProcessMap[processName]
var/datum/controller/process/newInstance = new oldInstance.type(src)
newInstance._copyStateFrom(oldInstance)
replaceProcess(oldInstance, newInstance)
oldInstance.kill()
/datum/controller/processScheduler/proc/enableProcess(var/processName as text)
if (hasProcess(processName))
var/datum/controller/process/process = nameToProcessMap[processName]
process.enable()
/datum/controller/processScheduler/proc/disableProcess(var/processName as text)
if (hasProcess(processName))
var/datum/controller/process/process = nameToProcessMap[processName]
process.disable()
/datum/controller/processScheduler/proc/getProcess(var/name)
return nameToProcessMap[name]
/datum/controller/processScheduler/proc/getProcessLastRunTime(var/datum/controller/process/process)
return last_run_time[process]
/datum/controller/processScheduler/proc/getIsRunning()
return isRunning
// Singleton instance of game_controller_new, setup in world.New()
var/global/datum/controller/processScheduler/processScheduler
/datum/controller/processScheduler
// Processes known by the scheduler
var/tmp/datum/controller/process/list/processes = new
// Processes that are currently running
var/tmp/datum/controller/process/list/running = new
// Processes that are idle
var/tmp/datum/controller/process/list/idle = new
// Processes that are queued to run
var/tmp/datum/controller/process/list/queued = new
// Process name -> process object map
var/tmp/datum/controller/process/list/nameToProcessMap = new
// Process last queued times (world time)
var/tmp/datum/controller/process/list/last_queued = new
// Process last start times (real time)
var/tmp/datum/controller/process/list/last_start = new
// Process last run durations
var/tmp/datum/controller/process/list/last_run_time = new
// Per process list of the last 20 durations
var/tmp/datum/controller/process/list/last_twenty_run_times = new
// Process highest run time
var/tmp/datum/controller/process/list/highest_run_time = new
// How long to sleep between runs (set to tick_lag in New)
var/tmp/scheduler_sleep_interval
// Controls whether the scheduler is running or not
var/tmp/isRunning = 0
// Setup for these processes will be deferred until all the other processes are set up.
var/tmp/list/deferredSetupList = new
var/tmp/currentTick = 0
var/tmp/currentTickStart = 0
var/tmp/cpuAverage = 0
/datum/controller/processScheduler/New()
..()
// When the process scheduler is first new'd, tick_lag may be wrong, so these
// get re-initialized when the process scheduler is started.
// (These are kept here for any processes that decide to process before round start)
scheduler_sleep_interval = world.tick_lag
/**
* deferSetupFor
* @param path processPath
* If a process needs to be initialized after everything else, add it to
* the deferred setup list. On goonstation, only the ticker needs to have
* this treatment.
*/
/datum/controller/processScheduler/proc/deferSetupFor(var/processPath)
if (!(processPath in deferredSetupList))
deferredSetupList += processPath
/datum/controller/processScheduler/proc/setup()
// There can be only one
if(processScheduler && (processScheduler != src))
del(src)
return 0
var/process
// Add all the processes we can find, except for the ticker
for (process in subtypesof(/datum/controller/process))
if (!(process in deferredSetupList))
addProcess(new process(src))
for (process in deferredSetupList)
addProcess(new process(src))
/datum/controller/processScheduler/proc/start()
isRunning = 1
// tick_lag will have been set by now, so re-initialize these
scheduler_sleep_interval = world.tick_lag
updateStartDelays()
spawn(0)
process()
/datum/controller/processScheduler/proc/process()
while(isRunning)
checkRunningProcesses()
queueProcesses()
runQueuedProcesses()
sleep(scheduler_sleep_interval)
/datum/controller/processScheduler/proc/stop()
isRunning = 0
/datum/controller/processScheduler/proc/checkRunningProcesses()
for(var/datum/controller/process/p in running)
p.update()
if (isnull(p)) // Process was killed
continue
var/status = p.getStatus()
var/previousStatus = p.getPreviousStatus()
// Check status changes
if(status != previousStatus)
//Status changed.
switch(status)
if(PROCESS_STATUS_PROBABLY_HUNG)
message_admins("Process '[p.name]' may be hung.")
if(PROCESS_STATUS_HUNG)
message_admins("Process '[p.name]' is hung and will be restarted.")
/datum/controller/processScheduler/proc/queueProcesses()
for(var/datum/controller/process/p in processes)
// Don't double-queue, don't queue running processes
if (p.disabled || p.running || p.queued || !p.idle)
continue
// If the process should be running by now, go ahead and queue it
if (world.time >= last_queued[p] + p.schedule_interval)
setQueuedProcessState(p)
/datum/controller/processScheduler/proc/runQueuedProcesses()
for(var/datum/controller/process/p in queued)
runProcess(p)
/datum/controller/processScheduler/proc/addProcess(var/datum/controller/process/process)
processes.Add(process)
process.idle()
idle.Add(process)
// init recordkeeping vars
last_start.Add(process)
last_start[process] = 0
last_run_time.Add(process)
last_run_time[process] = 0
last_twenty_run_times.Add(process)
last_twenty_run_times[process] = list()
highest_run_time.Add(process)
highest_run_time[process] = 0
// init starts and stops record starts
recordStart(process, 0)
recordEnd(process, 0)
// Set up process
process.setup()
// Save process in the name -> process map
nameToProcessMap[process.name] = process
/datum/controller/processScheduler/proc/replaceProcess(var/datum/controller/process/oldProcess, var/datum/controller/process/newProcess)
processes.Remove(oldProcess)
processes.Add(newProcess)
newProcess.idle()
idle.Remove(oldProcess)
running.Remove(oldProcess)
queued.Remove(oldProcess)
idle.Add(newProcess)
last_start.Remove(oldProcess)
last_start.Add(newProcess)
last_start[newProcess] = 0
last_run_time.Add(newProcess)
last_run_time[newProcess] = last_run_time[oldProcess]
last_run_time.Remove(oldProcess)
last_twenty_run_times.Add(newProcess)
last_twenty_run_times[newProcess] = last_twenty_run_times[oldProcess]
last_twenty_run_times.Remove(oldProcess)
highest_run_time.Add(newProcess)
highest_run_time[newProcess] = highest_run_time[oldProcess]
highest_run_time.Remove(oldProcess)
recordStart(newProcess, 0)
recordEnd(newProcess, 0)
nameToProcessMap[newProcess.name] = newProcess
/datum/controller/processScheduler/proc/updateStartDelays()
for(var/datum/controller/process/p in processes)
if(p.start_delay)
last_queued[p] = world.time - p.start_delay
/datum/controller/processScheduler/proc/runProcess(var/datum/controller/process/process)
spawn(0)
process.process()
/datum/controller/processScheduler/proc/processStarted(var/datum/controller/process/process)
setRunningProcessState(process)
recordStart(process)
/datum/controller/processScheduler/proc/processFinished(var/datum/controller/process/process)
setIdleProcessState(process)
recordEnd(process)
/datum/controller/processScheduler/proc/setIdleProcessState(var/datum/controller/process/process)
if (process in running)
running -= process
if (process in queued)
queued -= process
if (!(process in idle))
idle += process
/datum/controller/processScheduler/proc/setQueuedProcessState(var/datum/controller/process/process)
if (process in running)
running -= process
if (process in idle)
idle -= process
if (!(process in queued))
queued += process
// The other state transitions are handled internally by the process.
process.queued()
/datum/controller/processScheduler/proc/setRunningProcessState(var/datum/controller/process/process)
if (process in queued)
queued -= process
if (process in idle)
idle -= process
if (!(process in running))
running += process
/datum/controller/processScheduler/proc/recordStart(var/datum/controller/process/process, var/time = null)
if (isnull(time))
time = TimeOfHour
last_queued[process] = world.time
last_start[process] = time
else
last_queued[process] = (time == 0 ? 0 : world.time)
last_start[process] = time
/datum/controller/processScheduler/proc/recordEnd(var/datum/controller/process/process, var/time = null)
if (isnull(time))
time = TimeOfHour
// If world.timeofday has rolled over, then we need to adjust.
if (time < last_start[process])
last_start[process] -= 36000
var/lastRunTime = time - last_start[process]
if(lastRunTime < 0)
lastRunTime = 0
recordRunTime(process, lastRunTime)
/**
* recordRunTime
* Records a run time for a process
*/
/datum/controller/processScheduler/proc/recordRunTime(var/datum/controller/process/process, time)
last_run_time[process] = time
if(time > highest_run_time[process])
highest_run_time[process] = time
var/list/lastTwenty = last_twenty_run_times[process]
if (lastTwenty.len == 20)
lastTwenty.Cut(1, 2)
lastTwenty.len++
lastTwenty[lastTwenty.len] = time
/**
* averageRunTime
* returns the average run time (over the last 20) of the process
*/
/datum/controller/processScheduler/proc/averageRunTime(var/datum/controller/process/process)
var/lastTwenty = last_twenty_run_times[process]
var/t = 0
var/c = 0
for(var/time in lastTwenty)
t += time
c++
if(c > 0)
return t / c
return c
/datum/controller/processScheduler/proc/getProcessLastRunTime(var/datum/controller/process/process)
return last_run_time[process]
/datum/controller/processScheduler/proc/getProcessHighestRunTime(var/datum/controller/process/process)
return highest_run_time[process]
/datum/controller/processScheduler/proc/getStatusData()
var/list/data = new
for (var/datum/controller/process/p in processes)
data.len++
data[data.len] = p.getContextData()
return data
/datum/controller/processScheduler/proc/getProcessCount()
return processes.len
/datum/controller/processScheduler/proc/hasProcess(var/processName as text)
if (nameToProcessMap[processName])
return 1
/datum/controller/processScheduler/proc/killProcess(var/processName as text)
restartProcess(processName)
/datum/controller/processScheduler/proc/restartProcess(var/processName as text)
if (hasProcess(processName))
var/datum/controller/process/oldInstance = nameToProcessMap[processName]
var/datum/controller/process/newInstance = new oldInstance.type(src)
newInstance._copyStateFrom(oldInstance)
replaceProcess(oldInstance, newInstance)
oldInstance.kill()
/datum/controller/processScheduler/proc/enableProcess(var/processName as text)
if (hasProcess(processName))
var/datum/controller/process/process = nameToProcessMap[processName]
process.enable()
/datum/controller/processScheduler/proc/disableProcess(var/processName as text)
if (hasProcess(processName))
var/datum/controller/process/process = nameToProcessMap[processName]
process.disable()
/datum/controller/processScheduler/proc/sign(var/x)
if (x == 0)
return 1
return x / abs(x)
/datum/controller/processScheduler/proc/statProcesses()
if(!isRunning)
stat("Processes", "Scheduler not running")
return
stat("Processes", "[processes.len] (R [running.len] / Q [queued.len] / I [idle.len])")
stat(null, "[round(cpuAverage, 0.1)] CPU")
for(var/datum/controller/process/p in processes)
p.statProcess()

View File

@@ -1,127 +0,0 @@
/**
* updateQueue.dm
*/
#ifdef UPDATE_QUEUE_DEBUG
#define uq_dbg(text) world << text
#else
#define uq_dbg(text)
#endif
/datum/updateQueue
var/tmp/list/objects
var/tmp/previousStart
var/tmp/procName
var/tmp/list/arguments
var/tmp/datum/updateQueueWorker/currentWorker
var/tmp/workerTimeout
var/tmp/adjustedWorkerTimeout
var/tmp/currentKillCount
var/tmp/totalKillCount
/datum/updateQueue/New(list/objects = list(), procName = "update", list/arguments = list(), workerTimeout = 2, inplace = 0)
..()
uq_dbg("Update queue created.")
// Init proc allows for recycling the worker.
init(objects = objects, procName = procName, arguments = arguments, workerTimeout = workerTimeout, inplace = inplace)
/**
* init
* @param list objects objects to update
* @param text procName the proc to call on each item in the object list
* @param list arguments optional arguments to pass to the update proc
* @param number workerTimeout number of ticks to wait for an update to
finish before forking a new update worker
* @param bool inplace whether the updateQueue should make a copy of objects.
the internal list will be modified, so it is usually
a good idea to leave this alone. Default behavior is to
copy.
*/
/datum/updateQueue/proc/init(list/objects = list(), procName = "update", list/arguments = list(), workerTimeout = 2, inplace = 0)
uq_dbg("Update queue initialization started.")
if (!inplace)
// Make an internal copy of the list so we're not modifying the original.
initList(objects)
else
src.objects = objects
// Init vars
src.procName = procName
src.arguments = arguments
src.workerTimeout = workerTimeout
adjustedWorkerTimeout = workerTimeout
currentKillCount = 0
totalKillCount = 0
uq_dbg("Update queue initialization finished. procName = '[procName]'")
/datum/updateQueue/proc/initList(list/toCopy)
/**
* We will copy the list in reverse order, as our doWork proc
* will access them by popping an element off the end of the list.
* This ends up being quite a lot faster than taking elements off
* the head of the list.
*/
objects = new
uq_dbg("Copying [toCopy.len] items for processing.")
for(var/i=toCopy.len,i>0,)
objects.len++
objects[objects.len] = toCopy[i--]
/datum/updateQueue/proc/Run()
uq_dbg("Starting run...")
startWorker()
while (istype(currentWorker) && !currentWorker.finished)
sleep(2)
checkWorker()
uq_dbg("UpdateQueue completed run.")
/datum/updateQueue/proc/checkWorker()
if(istype(currentWorker))
// If world.timeofday has rolled over, then we need to adjust.
if(world.timeofday < currentWorker.lastStart)
currentWorker.lastStart -= 864000
if(world.timeofday - currentWorker.lastStart > adjustedWorkerTimeout)
// This worker is a bit slow, let's spawn a new one and kill the old one.
uq_dbg("Current worker is lagging... starting a new one.")
killWorker()
startWorker()
else // No worker!
uq_dbg("update queue ended up without a worker... starting a new one...")
startWorker()
/datum/updateQueue/proc/startWorker()
// only run the worker if we have objects to work on
if(objects.len)
uq_dbg("Starting worker process.")
// No need to create a fresh worker if we already have one...
if (istype(currentWorker))
currentWorker.init(objects, procName, arguments)
else
currentWorker = new(objects, procName, arguments)
currentWorker.start()
else
uq_dbg("Queue is empty. No worker was started.")
currentWorker = null
/datum/updateQueue/proc/killWorker()
// Kill the worker
currentWorker.kill()
currentWorker = null
// After we kill a worker, yield so that if the worker's been tying up the cpu, other stuff can immediately resume
sleep(-1)
currentKillCount++
totalKillCount++
if (currentKillCount >= 3)
uq_dbg("[currentKillCount] workers have been killed with a timeout of [adjustedWorkerTimeout]. Increasing worker timeout to compensate.")
adjustedWorkerTimeout++
currentKillCount = 0

View File

@@ -1,83 +0,0 @@
datum/updateQueueWorker
var/tmp/list/objects
var/tmp/killed
var/tmp/finished
var/tmp/procName
var/tmp/list/arguments
var/tmp/lastStart
var/tmp/cpuThreshold
datum/updateQueueWorker/New(var/list/objects, var/procName, var/list/arguments, var/cpuThreshold = 90)
..()
uq_dbg("updateQueueWorker created.")
init(objects, procName, arguments, cpuThreshold)
datum/updateQueueWorker/proc/init(var/list/objects, var/procName, var/list/arguments, var/cpuThreshold = 90)
src.objects = objects
src.procName = procName
src.arguments = arguments
src.cpuThreshold = cpuThreshold
killed = 0
finished = 0
datum/updateQueueWorker/proc/doWork()
// If there's nothing left to execute or we were killed, mark finished and return.
if (!objects || !objects.len) return finished()
lastStart = world.timeofday // Absolute number of ticks since the world started up
var/datum/object = objects[objects.len] // Pull out the object
objects.len-- // Remove the object from the list
if (istype(object) && !isturf(object) && !object.disposed && isnull(object.gcDestroyed)) // We only work with real objects
call(object, procName)(arglist(arguments))
// If there's nothing left to execute
// or we were killed while running the above code, mark finished and return.
if (!objects || !objects.len) return finished()
if (world.cpu > cpuThreshold)
// We don't want to force a tick into overtime!
// If the tick is about to go overtime, spawn the next update to go
// in the next tick.
uq_dbg("tick went into overtime with world.cpu = [world.cpu], deferred next update to next tick [1+(world.time / world.tick_lag)]")
spawn(1)
doWork()
else
spawn(0) // Execute anonymous function immediately as if we were in a while loop...
doWork()
datum/updateQueueWorker/proc/finished()
uq_dbg("updateQueueWorker finished.")
/**
* If the worker was killed while it was working on something, it
* should delete itself when it finally finishes working on it.
* Meanwhile, the updateQueue will have proceeded on with the rest of
* the queue. This will also terminate the spawned function that was
* created in the kill() proc.
*/
if(killed)
del(src)
finished = 1
datum/updateQueueWorker/proc/kill()
uq_dbg("updateQueueWorker killed.")
killed = 1
objects = null
/**
* If the worker is not done in 30 seconds after it's killed,
* we'll forcibly delete it, causing the anonymous function it was
* running to be terminated. Hasta la vista, baby.
*/
spawn(300)
del(src)
datum/updateQueueWorker/proc/start()
uq_dbg("updateQueueWorker started.")
spawn(0)
doWork()