Scheduler now uses btime, and other improvements

This commit is contained in:
Yoshax
2016-06-01 17:47:50 +01:00
parent 9d47019f32
commit 0fb98bbabd
45 changed files with 408 additions and 981 deletions

View File

@@ -1,32 +0,0 @@
// DM Environment file for ProcessScheduler.dme.
// All manual changes should be made outside the BEGIN_ and END_ blocks.
// New source code should be placed in .dm files: choose File/New --> Code File.
// BEGIN_INTERNALS
// END_INTERNALS
// BEGIN_FILE_DIR
#define FILE_DIR .
// END_FILE_DIR
// BEGIN_PREFERENCES
// END_PREFERENCES
// BEGIN_INCLUDE
#include "core\_define.dm"
#include "core\_stubs.dm"
#include "core\process.dm"
#include "core\processScheduler.dm"
#include "core\updateQueue.dm"
#include "core\updateQueueWorker.dm"
#include "test\processSchedulerView.dm"
#include "test\testDyingUpdateQueueProcess.dm"
#include "test\testHarness.dm"
#include "test\testHungProcess.dm"
#include "test\testNiceProcess.dm"
#include "test\testSlowProcess.dm"
#include "test\testUpdateQueue.dm"
#include "test\testUpdateQueueProcess.dm"
#include "test\testZombieProcess.dm"
// END_INCLUDE

View File

@@ -1,17 +0,0 @@
// Process status defines
#define PROCESS_STATUS_IDLE 1
#define PROCESS_STATUS_QUEUED 2
#define PROCESS_STATUS_RUNNING 3
#define PROCESS_STATUS_MAYBE_HUNG 4
#define PROCESS_STATUS_PROBABLY_HUNG 5
#define PROCESS_STATUS_HUNG 6
// Process time thresholds
#define PROCESS_DEFAULT_HANG_WARNING_TIME 300 // 30 seconds
#define PROCESS_DEFAULT_HANG_ALERT_TIME 600 // 60 seconds
#define PROCESS_DEFAULT_HANG_RESTART_TIME 900 // 90 seconds
#define PROCESS_DEFAULT_SCHEDULE_INTERVAL 50 // 50 ticks
#define PROCESS_DEFAULT_SLEEP_INTERVAL 2 // 2 ticks
#define PROCESS_DEFAULT_CPU_THRESHOLD 90 // 90%
//#define UPDATE_QUEUE_DEBUG

View File

@@ -4,15 +4,7 @@
* This file contains constructs that the process scheduler expects to exist
* in a standard ss13 fork.
*/
/*
/**
* message_admins
*
* sends a message to admins
*/
/proc/message_admins(msg)
world << msg
*/
/**
* logTheThing
*
@@ -25,14 +17,3 @@
world << "Diary: \[[diaryType]:[type]] [text]"
else
world << "Log: \[[type]] [text]"
/**
* var/disposed
*
* In goonstation, disposed is set to 1 after an object enters the delete queue
* or the object is placed in an object pool (effectively out-of-play so to speak)
*/
/datum/var/disposed
// Garbage collection (controller).
/datum/var/gcDestroyed
/datum/var/timeDestroyed

View File

@@ -48,7 +48,7 @@
// This controls how often the process will yield (call sleep(0)) while it is running.
// Every concurrent process should sleep periodically while running in order to allow other
// processes to execute concurrently.
var/tmp/sleep_interval = PROCESS_DEFAULT_SLEEP_INTERVAL
var/tmp/sleep_interval
// hang_warning_time - this is the time (in 1/10 seconds) after which the server will begin to show "maybe hung" in the context window
var/tmp/hang_warning_time = PROCESS_DEFAULT_HANG_WARNING_TIME
@@ -59,20 +59,20 @@
// hang_restart_time - After this much time(in 1/10 seconds), the server will automatically kill and restart the process.
var/tmp/hang_restart_time = PROCESS_DEFAULT_HANG_RESTART_TIME
// cpu_threshold - if world.cpu >= cpu_threshold, scheck() will call sleep(1) to defer further work until the next tick. This keeps a process from driving a tick into overtime (causing perceptible lag)
var/tmp/cpu_threshold = PROCESS_DEFAULT_CPU_THRESHOLD
// How many times in the current run has the process deferred work till the next tick?
var/tmp/cpu_defer_count = 0
// How many SCHECKs have been skipped (to limit btime calls)
var/tmp/calls_since_last_scheck = 0
/**
* recordkeeping vars
*/
// Records the time (server ticks) at which the process last finished sleeping
// Records the time (1/10s timeofday) at which the process last finished sleeping
var/tmp/last_slept = 0
// Records the time (s-ticks) at which the process last began running
// Records the time (1/10s timeofday) at which the process last began running
var/tmp/run_start = 0
// Records the number of times this process has been killed and restarted
@@ -85,26 +85,33 @@
var/tmp/last_object
datum/controller/process/New(var/datum/controller/processScheduler/scheduler)
// Counts the number of times an exception has occurred; gets reset after 10
var/tmp/list/exceptions = list()
// Number of deciseconds to delay before starting the process
var/start_delay = 0
/datum/controller/process/New(var/datum/controller/processScheduler/scheduler)
..()
main = scheduler
previousStatus = "idle"
idle()
name = "process"
schedule_interval = 50
sleep_interval = 2
sleep_interval = world.tick_lag / PROCESS_DEFAULT_SLEEP_INTERVAL
last_slept = 0
run_start = 0
ticks = 0
last_task = 0
last_object = null
datum/controller/process/proc/started()
/datum/controller/process/proc/started()
var/timeofhour = TimeOfHour
// Initialize last_slept so we can know when to sleep
last_slept = world.timeofday
last_slept = timeofhour
// Initialize run_start so we can detect hung processes.
run_start = world.timeofday
run_start = timeofhour
// Initialize defer count
cpu_defer_count = 0
@@ -114,65 +121,65 @@ datum/controller/process/proc/started()
onStart()
datum/controller/process/proc/finished()
/datum/controller/process/proc/finished()
ticks++
idle()
main.processFinished(src)
onFinish()
datum/controller/process/proc/doWork()
/datum/controller/process/proc/doWork()
datum/controller/process/proc/setup()
/datum/controller/process/proc/setup()
datum/controller/process/proc/process()
/datum/controller/process/proc/process()
started()
doWork()
finished()
datum/controller/process/proc/running()
/datum/controller/process/proc/running()
idle = 0
queued = 0
running = 1
hung = 0
setStatus(PROCESS_STATUS_RUNNING)
datum/controller/process/proc/idle()
/datum/controller/process/proc/idle()
queued = 0
running = 0
idle = 1
hung = 0
setStatus(PROCESS_STATUS_IDLE)
datum/controller/process/proc/queued()
/datum/controller/process/proc/queued()
idle = 0
running = 0
queued = 1
hung = 0
setStatus(PROCESS_STATUS_QUEUED)
datum/controller/process/proc/hung()
/datum/controller/process/proc/hung()
hung = 1
setStatus(PROCESS_STATUS_HUNG)
datum/controller/process/proc/handleHung()
/datum/controller/process/proc/handleHung()
var/timeofhour = TimeOfHour
var/datum/lastObj = last_object
var/lastObjType = "null"
if(istype(lastObj))
lastObjType = lastObj.type
// If world.timeofday has rolled over, then we need to adjust.
if (world.timeofday < run_start)
run_start -= 864000
var/msg = "[name] process hung at tick #[ticks]. Process was unresponsive for [(world.timeofday - run_start) / 10] seconds and was restarted. Last task: [last_task]. Last Object Type: [lastObjType]"
// If timeofhour has rolled over, then we need to adjust.
if (timeofhour < run_start)
run_start -= 36000
var/msg = "[name] process hung at tick #[ticks]. Process was unresponsive for [(timeofhour - run_start) / 10] seconds and was restarted. Last task: [last_task]. Last Object Type: [lastObjType]"
logTheThing("debug", null, null, msg)
logTheThing("diary", null, null, msg, "debug")
message_admins(msg)
main.restartProcess(src.name)
datum/controller/process/proc/kill()
/datum/controller/process/proc/kill()
if (!killed)
var/msg = "[name] process was killed at tick #[ticks]."
logTheThing("debug", null, null, msg)
@@ -182,59 +189,68 @@ datum/controller/process/proc/kill()
// Allow inheritors to clean up if needed
onKill()
killed = TRUE
// This should del
del(src)
del(src) // This should del
datum/controller/process/proc/scheck(var/tickId = 0)
// Do not call this directly - use SHECK or SCHECK_EVERY
/datum/controller/process/proc/sleepCheck(var/tickId = 0)
calls_since_last_scheck = 0
if (killed)
// The kill proc is the only place where killed is set.
// The kill proc should have deleted this datum, and all sleeping procs that are
// owned by it.
CRASH("A killed process is still running somehow...")
if (hung)
// This will only really help if the doWork proc ends up in an infinite loop.
handleHung()
CRASH("Process [name] hung and was restarted.")
// For each tick the process defers, it increments the cpu_defer_count so we don't
// defer indefinitely
if (world.cpu >= cpu_threshold + cpu_defer_count * 10)
sleep(1)
if (main.getCurrentTickElapsedTime() > main.timeAllowance)
sleep(world.tick_lag)
cpu_defer_count++
last_slept = world.timeofday
last_slept = TimeOfHour
else
// If world.timeofday has rolled over, then we need to adjust.
if (world.timeofday < last_slept)
last_slept -= 864000
var/timeofhour = TimeOfHour
// If timeofhour has rolled over, then we need to adjust.
if (timeofhour < last_slept)
last_slept -= 36000
if (world.timeofday > last_slept + sleep_interval)
// If we haven't slept in sleep_interval ticks, sleep to allow other work to proceed.
if (timeofhour > last_slept + sleep_interval)
// If we haven't slept in sleep_interval deciseconds, sleep to allow other work to proceed.
sleep(0)
last_slept = world.timeofday
last_slept = TimeOfHour
datum/controller/process/proc/update()
/datum/controller/process/proc/update()
// Clear delta
if(previousStatus != status)
setStatus(status)
var/elapsedTime = getElapsedTime()
if (elapsedTime > hang_restart_time)
if (hung)
handleHung()
return
else if (elapsedTime > hang_restart_time)
hung()
else if (elapsedTime > hang_alert_time)
setStatus(PROCESS_STATUS_PROBABLY_HUNG)
else if (elapsedTime > hang_warning_time)
setStatus(PROCESS_STATUS_MAYBE_HUNG)
datum/controller/process/proc/getElapsedTime()
if (world.timeofday < run_start)
return world.timeofday - (run_start - 864000)
return world.timeofday - run_start
datum/controller/process/proc/tickDetail()
/datum/controller/process/proc/getElapsedTime()
var/timeofhour = TimeOfHour
if (timeofhour < run_start)
return timeofhour - (run_start - 36000)
return timeofhour - run_start
/datum/controller/process/proc/tickDetail()
return
datum/controller/process/proc/getContext()
/datum/controller/process/proc/getContext()
return "<tr><td>[name]</td><td>[main.averageRunTime(src)]</td><td>[main.last_run_time[src]]</td><td>[main.highest_run_time[src]]</td><td>[ticks]</td></tr>\n"
datum/controller/process/proc/getContextData()
/datum/controller/process/proc/getContextData()
return list(
"name" = name,
"averageRunTime" = main.averageRunTime(src),
@@ -246,10 +262,10 @@ datum/controller/process/proc/getContextData()
"disabled" = disabled
)
datum/controller/process/proc/getStatus()
/datum/controller/process/proc/getStatus()
return status
datum/controller/process/proc/getStatusText(var/s = 0)
/datum/controller/process/proc/getStatusText(var/s = 0)
if(!s)
s = status
switch(s)
@@ -268,21 +284,21 @@ datum/controller/process/proc/getStatusText(var/s = 0)
else
return "UNKNOWN"
datum/controller/process/proc/getPreviousStatus()
/datum/controller/process/proc/getPreviousStatus()
return previousStatus
datum/controller/process/proc/getPreviousStatusText()
/datum/controller/process/proc/getPreviousStatusText()
return getStatusText(previousStatus)
datum/controller/process/proc/setStatus(var/newStatus)
/datum/controller/process/proc/setStatus(var/newStatus)
previousStatus = status
status = newStatus
datum/controller/process/proc/setLastTask(var/task, var/object)
/datum/controller/process/proc/setLastTask(var/task, var/object)
last_task = task
last_object = object
datum/controller/process/proc/_copyStateFrom(var/datum/controller/process/target)
/datum/controller/process/proc/_copyStateFrom(var/datum/controller/process/target)
main = target.main
name = target.name
schedule_interval = target.schedule_interval
@@ -295,28 +311,62 @@ datum/controller/process/proc/_copyStateFrom(var/datum/controller/process/target
last_object = target.last_object
copyStateFrom(target)
datum/controller/process/proc/copyStateFrom(var/datum/controller/process/target)
/datum/controller/process/proc/copyStateFrom(var/datum/controller/process/target)
datum/controller/process/proc/onKill()
/datum/controller/process/proc/onKill()
datum/controller/process/proc/onStart()
/datum/controller/process/proc/onStart()
datum/controller/process/proc/onFinish()
/datum/controller/process/proc/onFinish()
datum/controller/process/proc/disable()
/datum/controller/process/proc/disable()
disabled = 1
datum/controller/process/proc/enable()
/datum/controller/process/proc/enable()
disabled = 0
/datum/controller/process/proc/getAverageRunTime()
return main.averageRunTime(src)
/datum/controller/process/proc/getLastRunTime()
return main.getProcessLastRunTime(src)
/datum/controller/process/proc/getHighestRunTime()
return main.getProcessHighestRunTime(src)
/datum/controller/process/proc/getTicks()
return ticks
/datum/controller/process/proc/getStatName()
return name
/datum/controller/process/proc/statProcess()
var/averageRunTime = round(getAverageRunTime(), 0.1)/10
var/lastRunTime = round(getLastRunTime(), 0.1)/10
var/highestRunTime = round(getHighestRunTime(), 0.1)/10
stat("[name]", "T#[getTicks()] | AR [averageRunTime] | LR [lastRunTime] | HR [highestRunTime] | D [cpu_defer_count]")
/datum/controller/process/proc/getTickTime()
return "#[getTicks()]\t- [getLastRunTime()]"
/datum/controller/process/proc/catchException(var/exception/e, var/thrower)
var/etext = "[e]"
var/eid = "[e]" // Exception ID, for tracking repeated exceptions
var/ptext = "" // "processing..." text, for what was being processed (if known)
if(istype(e))
etext += " in [e.file], line [e.line]"
eid = "[e.file]:[e.line]"
if(eid in exceptions)
if(exceptions[eid]++ >= 10)
return
else
exceptions[eid] = 1
if(istype(thrower, /datum))
var/datum/D = thrower
ptext = " processing [D.type]"
if(istype(thrower, /atom))
var/atom/A = thrower
ptext += " ([A]) ([A.x],[A.y],[A.z])"
log_to_dd("\[[time_stamp()]\] Process [name] caught exception[ptext]: [etext]")
if(exceptions[eid] >= 10)
log_to_dd("This exception will now be ignored for ten minutes.")
spawn(6000)
exceptions[eid] = 0
/datum/controller/process/proc/catchBadType(var/datum/caught)
if(isnull(caught) || !istype(caught) || !isnull(caught.gcDestroyed))
return // Only bother with types we can identify and that don't belong
catchException("Type [caught.type] does not belong in process' queue")

View File

@@ -17,7 +17,10 @@ var/global/datum/controller/processScheduler/processScheduler
// Process name -> process object map
var/tmp/datum/controller/process/list/nameToProcessMap = new
// Process last start times
// Process last queued times (world time)
var/tmp/datum/controller/process/list/last_queued = new
// Process last start times (real time)
var/tmp/datum/controller/process/list/last_start = new
// Process last run durations
@@ -29,8 +32,8 @@ var/global/datum/controller/processScheduler/processScheduler
// Process highest run time
var/tmp/datum/controller/process/list/highest_run_time = new
// Sleep 1 tick -- This may be too aggressive.
var/tmp/scheduler_sleep_interval = 1
// How long to sleep between runs (set to tick_lag in New)
var/tmp/scheduler_sleep_interval
// Controls whether the scheduler is running or not
var/tmp/isRunning = 0
@@ -38,6 +41,25 @@ var/global/datum/controller/processScheduler/processScheduler
// Setup for these processes will be deferred until all the other processes are set up.
var/tmp/list/deferredSetupList = new
var/tmp/currentTick = 0
var/tmp/currentTickStart = 0
var/tmp/timeAllowance = 0
var/tmp/cpuAverage = 0
var/tmp/timeAllowanceMax = 0
/datum/controller/processScheduler/New()
..()
// When the process scheduler is first new'd, tick_lag may be wrong, so these
// get re-initialized when the process scheduler is started.
// (These are kept here for any processes that decide to process before round start)
scheduler_sleep_interval = world.tick_lag
timeAllowance = world.tick_lag * 0.5
timeAllowanceMax = world.tick_lag
/**
* deferSetupFor
* @param path processPath
@@ -57,7 +79,7 @@ var/global/datum/controller/processScheduler/processScheduler
var/process
// Add all the processes we can find, except for the ticker
for (process in typesof(/datum/controller/process) - /datum/controller/process)
for (process in subtypesof(/datum/controller/process))
if (!(process in deferredSetupList))
addProcess(new process(src))
@@ -66,11 +88,22 @@ var/global/datum/controller/processScheduler/processScheduler
/datum/controller/processScheduler/proc/start()
isRunning = 1
// tick_lag will have been set by now, so re-initialize these
scheduler_sleep_interval = world.tick_lag
timeAllowance = world.tick_lag * 0.5
timeAllowanceMax = world.tick_lag
updateStartDelays()
spawn(0)
process()
/datum/controller/processScheduler/proc/process()
updateCurrentTickData()
for(var/i=world.tick_lag,i<world.tick_lag*50,i+=world.tick_lag)
spawn(i) updateCurrentTickData()
while(isRunning)
// Hopefully spawning this for 50 ticks in the future will make it the first thing in the queue.
spawn(world.tick_lag*50) updateCurrentTickData()
checkRunningProcesses()
queueProcesses()
runQueuedProcesses()
@@ -92,15 +125,11 @@ var/global/datum/controller/processScheduler/processScheduler
// Check status changes
if(status != previousStatus)
//Status changed.
switch(status)
if(PROCESS_STATUS_MAYBE_HUNG)
message_admins("Process '[p.name]' is [p.getStatusText(status)].")
if(PROCESS_STATUS_PROBABLY_HUNG)
message_admins("Process '[p.name]' is [p.getStatusText(status)].")
message_admins("Process '[p.name]' may be hung.")
if(PROCESS_STATUS_HUNG)
message_admins("Process '[p.name]' is [p.getStatusText(status)].")
p.handleHung()
message_admins("Process '[p.name]' is hung and will be restarted.")
/datum/controller/processScheduler/proc/queueProcesses()
for(var/datum/controller/process/p in processes)
@@ -108,12 +137,8 @@ var/global/datum/controller/processScheduler/processScheduler
if (p.disabled || p.running || p.queued || !p.idle)
continue
// If world.timeofday has rolled over, then we need to adjust.
if (world.timeofday < last_start[p])
last_start[p] -= 864000
// If the process should be running by now, go ahead and queue it
if (world.timeofday > last_start[p] + p.schedule_interval)
if (world.time >= last_queued[p] + p.schedule_interval)
setQueuedProcessState(p)
/datum/controller/processScheduler/proc/runQueuedProcesses()
@@ -176,6 +201,10 @@ var/global/datum/controller/processScheduler/processScheduler
nameToProcessMap[newProcess.name] = newProcess
/datum/controller/processScheduler/proc/updateStartDelays()
for(var/datum/controller/process/p in processes)
if(p.start_delay)
last_queued[p] = world.time - p.start_delay
/datum/controller/processScheduler/proc/runProcess(var/datum/controller/process/process)
spawn(0)
@@ -197,8 +226,6 @@ var/global/datum/controller/processScheduler/processScheduler
if (!(process in idle))
idle += process
process.idle()
/datum/controller/processScheduler/proc/setQueuedProcessState(var/datum/controller/process/process)
if (process in running)
running -= process
@@ -218,21 +245,22 @@ var/global/datum/controller/processScheduler/processScheduler
if (!(process in running))
running += process
process.running()
/datum/controller/processScheduler/proc/recordStart(var/datum/controller/process/process, var/time = null)
if (isnull(time))
time = world.timeofday
last_start[process] = time
time = TimeOfHour
last_queued[process] = world.time
last_start[process] = time
else
last_queued[process] = (time == 0 ? 0 : world.time)
last_start[process] = time
/datum/controller/processScheduler/proc/recordEnd(var/datum/controller/process/process, var/time = null)
if (isnull(time))
time = world.timeofday
time = TimeOfHour
// If world.timeofday has rolled over, then we need to adjust.
if (time < last_start[process])
last_start[process] -= 864000
last_start[process] -= 36000
var/lastRunTime = time - last_start[process]
@@ -273,6 +301,12 @@ var/global/datum/controller/processScheduler/processScheduler
return t / c
return c
/datum/controller/processScheduler/proc/getProcessLastRunTime(var/datum/controller/process/process)
return last_run_time[process]
/datum/controller/processScheduler/proc/getProcessHighestRunTime(var/datum/controller/process/process)
return highest_run_time[process]
/datum/controller/processScheduler/proc/getStatusData()
var/list/data = new
@@ -310,11 +344,39 @@ var/global/datum/controller/processScheduler/processScheduler
var/datum/controller/process/process = nameToProcessMap[processName]
process.disable()
/datum/controller/processScheduler/proc/getProcess(var/name)
return nameToProcessMap[name]
/datum/controller/processScheduler/proc/getCurrentTickElapsedTime()
if (world.time > currentTick)
updateCurrentTickData()
return 0
else
return TimeOfHour - currentTickStart
/datum/controller/processScheduler/proc/getProcessLastRunTime(var/datum/controller/process/process)
return last_run_time[process]
/datum/controller/processScheduler/proc/updateCurrentTickData()
if (world.time > currentTick)
// New tick!
currentTick = world.time
currentTickStart = TimeOfHour
updateTimeAllowance()
cpuAverage = (world.cpu + cpuAverage + cpuAverage) / 3
/datum/controller/processScheduler/proc/getIsRunning()
return isRunning
/datum/controller/processScheduler/proc/updateTimeAllowance()
// Time allowance goes down linearly with world.cpu.
var/tmp/error = cpuAverage - 100
var/tmp/timeAllowanceDelta = sign(error) * -0.5 * world.tick_lag * max(0, 0.001 * abs(error))
//timeAllowance = world.tick_lag * min(1, 0.5 * ((200/max(1,cpuAverage)) - 1))
timeAllowance = min(timeAllowanceMax, max(0, timeAllowance + timeAllowanceDelta))
/datum/controller/processScheduler/proc/sign(var/x)
if (x == 0)
return 1
return x / abs(x)
/datum/controller/processScheduler/proc/statProcesses()
if(!isRunning)
stat("Processes", "Scheduler not running")
return
stat("Processes", "[processes.len] (R [running.len] / Q [queued.len] / I [idle.len])")
stat(null, "[round(cpuAverage, 0.1)] CPU, [round(timeAllowance, 0.1)/10] TA")
for(var/datum/controller/process/p in processes)
p.statProcess()

View File

@@ -1,127 +0,0 @@
/**
* updateQueue.dm
*/
#ifdef UPDATE_QUEUE_DEBUG
#define uq_dbg(text) world << text
#else
#define uq_dbg(text)
#endif
/datum/updateQueue
var/tmp/list/objects
var/tmp/previousStart
var/tmp/procName
var/tmp/list/arguments
var/tmp/datum/updateQueueWorker/currentWorker
var/tmp/workerTimeout
var/tmp/adjustedWorkerTimeout
var/tmp/currentKillCount
var/tmp/totalKillCount
/datum/updateQueue/New(list/objects = list(), procName = "update", list/arguments = list(), workerTimeout = 2, inplace = 0)
..()
uq_dbg("Update queue created.")
// Init proc allows for recycling the worker.
init(objects = objects, procName = procName, arguments = arguments, workerTimeout = workerTimeout, inplace = inplace)
/**
* init
* @param list objects objects to update
* @param text procName the proc to call on each item in the object list
* @param list arguments optional arguments to pass to the update proc
* @param number workerTimeout number of ticks to wait for an update to
finish before forking a new update worker
* @param bool inplace whether the updateQueue should make a copy of objects.
the internal list will be modified, so it is usually
a good idea to leave this alone. Default behavior is to
copy.
*/
/datum/updateQueue/proc/init(list/objects = list(), procName = "update", list/arguments = list(), workerTimeout = 2, inplace = 0)
uq_dbg("Update queue initialization started.")
if (!inplace)
// Make an internal copy of the list so we're not modifying the original.
initList(objects)
else
src.objects = objects
// Init vars
src.procName = procName
src.arguments = arguments
src.workerTimeout = workerTimeout
adjustedWorkerTimeout = workerTimeout
currentKillCount = 0
totalKillCount = 0
uq_dbg("Update queue initialization finished. procName = '[procName]'")
/datum/updateQueue/proc/initList(list/toCopy)
/**
* We will copy the list in reverse order, as our doWork proc
* will access them by popping an element off the end of the list.
* This ends up being quite a lot faster than taking elements off
* the head of the list.
*/
objects = new
uq_dbg("Copying [toCopy.len] items for processing.")
for(var/i=toCopy.len,i>0,)
objects.len++
objects[objects.len] = toCopy[i--]
/datum/updateQueue/proc/Run()
uq_dbg("Starting run...")
startWorker()
while (istype(currentWorker) && !currentWorker.finished)
sleep(2)
checkWorker()
uq_dbg("UpdateQueue completed run.")
/datum/updateQueue/proc/checkWorker()
if(istype(currentWorker))
// If world.timeofday has rolled over, then we need to adjust.
if(world.timeofday < currentWorker.lastStart)
currentWorker.lastStart -= 864000
if(world.timeofday - currentWorker.lastStart > adjustedWorkerTimeout)
// This worker is a bit slow, let's spawn a new one and kill the old one.
uq_dbg("Current worker is lagging... starting a new one.")
killWorker()
startWorker()
else // No worker!
uq_dbg("update queue ended up without a worker... starting a new one...")
startWorker()
/datum/updateQueue/proc/startWorker()
// only run the worker if we have objects to work on
if(objects.len)
uq_dbg("Starting worker process.")
// No need to create a fresh worker if we already have one...
if (istype(currentWorker))
currentWorker.init(objects, procName, arguments)
else
currentWorker = new(objects, procName, arguments)
currentWorker.start()
else
uq_dbg("Queue is empty. No worker was started.")
currentWorker = null
/datum/updateQueue/proc/killWorker()
// Kill the worker
currentWorker.kill()
currentWorker = null
// After we kill a worker, yield so that if the worker's been tying up the cpu, other stuff can immediately resume
sleep(-1)
currentKillCount++
totalKillCount++
if (currentKillCount >= 3)
uq_dbg("[currentKillCount] workers have been killed with a timeout of [adjustedWorkerTimeout]. Increasing worker timeout to compensate.")
adjustedWorkerTimeout++
currentKillCount = 0

View File

@@ -1,83 +0,0 @@
datum/updateQueueWorker
var/tmp/list/objects
var/tmp/killed
var/tmp/finished
var/tmp/procName
var/tmp/list/arguments
var/tmp/lastStart
var/tmp/cpuThreshold
datum/updateQueueWorker/New(var/list/objects, var/procName, var/list/arguments, var/cpuThreshold = 90)
..()
uq_dbg("updateQueueWorker created.")
init(objects, procName, arguments, cpuThreshold)
datum/updateQueueWorker/proc/init(var/list/objects, var/procName, var/list/arguments, var/cpuThreshold = 90)
src.objects = objects
src.procName = procName
src.arguments = arguments
src.cpuThreshold = cpuThreshold
killed = 0
finished = 0
datum/updateQueueWorker/proc/doWork()
// If there's nothing left to execute or we were killed, mark finished and return.
if (!objects || !objects.len) return finished()
lastStart = world.timeofday // Absolute number of ticks since the world started up
var/datum/object = objects[objects.len] // Pull out the object
objects.len-- // Remove the object from the list
if (istype(object) && !isturf(object) && !object.disposed && isnull(object.gcDestroyed)) // We only work with real objects
call(object, procName)(arglist(arguments))
// If there's nothing left to execute
// or we were killed while running the above code, mark finished and return.
if (!objects || !objects.len) return finished()
if (world.cpu > cpuThreshold)
// We don't want to force a tick into overtime!
// If the tick is about to go overtime, spawn the next update to go
// in the next tick.
uq_dbg("tick went into overtime with world.cpu = [world.cpu], deferred next update to next tick [1+(world.time / world.tick_lag)]")
spawn(1)
doWork()
else
spawn(0) // Execute anonymous function immediately as if we were in a while loop...
doWork()
datum/updateQueueWorker/proc/finished()
uq_dbg("updateQueueWorker finished.")
/**
* If the worker was killed while it was working on something, it
* should delete itself when it finally finishes working on it.
* Meanwhile, the updateQueue will have proceeded on with the rest of
* the queue. This will also terminate the spawned function that was
* created in the kill() proc.
*/
if(killed)
del(src)
finished = 1
datum/updateQueueWorker/proc/kill()
uq_dbg("updateQueueWorker killed.")
killed = 1
objects = null
/**
* If the worker is not done in 30 seconds after it's killed,
* we'll forcibly delete it, causing the anonymous function it was
* running to be terminated. Hasta la vista, baby.
*/
spawn(300)
del(src)
datum/updateQueueWorker/proc/start()
uq_dbg("updateQueueWorker started.")
spawn(0)
doWork()

View File

@@ -1,94 +0,0 @@
/datum/processSchedulerView
/datum/processSchedulerView/Topic(href, href_list)
if (!href_list["action"])
return
switch (href_list["action"])
if ("kill")
var/toKill = href_list["name"]
processScheduler.killProcess(toKill)
refreshProcessTable()
if ("enable")
var/toEnable = href_list["name"]
processScheduler.enableProcess(toEnable)
refreshProcessTable()
if ("disable")
var/toDisable = href_list["name"]
processScheduler.disableProcess(toDisable)
refreshProcessTable()
if ("refresh")
refreshProcessTable()
/datum/processSchedulerView/proc/refreshProcessTable()
windowCall("handleRefresh", getProcessTable())
/datum/processSchedulerView/proc/windowCall(var/function, var/data = null)
usr << output(data, "processSchedulerContext.browser:[function]")
/datum/processSchedulerView/proc/getProcessTable()
var/text = "<table class=\"table table-striped\"><thead><tr><td>Name</td><td>Avg(s)</td><td>Last(s)</td><td>Highest(s)</td><td>Tickcount</td><td>Tickrate</td><td>State</td><td>Action</td></tr></thead><tbody>"
// and the context of each
for (var/list/data in processScheduler.getStatusData())
text += "<tr>"
text += "<td>[data["name"]]</td>"
text += "<td>[num2text(data["averageRunTime"]/10,3)]</td>"
text += "<td>[num2text(data["lastRunTime"]/10,3)]</td>"
text += "<td>[num2text(data["highestRunTime"]/10,3)]</td>"
text += "<td>[num2text(data["ticks"],4)]</td>"
text += "<td>[data["schedule"]]</td>"
text += "<td>[data["status"]]</td>"
text += "<td><button class=\"btn kill-btn\" data-process-name=\"[data["name"]]\" id=\"kill-[data["name"]]\">Kill</button>"
if (data["disabled"])
text += "<button class=\"btn enable-btn\" data-process-name=\"[data["name"]]\" id=\"enable-[data["name"]]\">Enable</button>"
else
text += "<button class=\"btn disable-btn\" data-process-name=\"[data["name"]]\" id=\"disable-[data["name"]]\">Disable</button>"
text += "</td>"
text += "</tr>"
text += "</tbody></table>"
return text
/**
* getContext
* Outputs an interface showing stats for all processes.
*/
/datum/processSchedulerView/proc/getContext()
bootstrap_browse()
usr << browse('processScheduler.js', "file=processScheduler.js;display=0")
var/text = {"<html><head>
<title>Process Scheduler Detail</title>
<script type="text/javascript">var ref = '\ref[src]';</script>
[bootstrap_includes()]
<script type="text/javascript" src="processScheduler.js"></script>
</head>
<body>
<h2>Process Scheduler</h2>
<div class="btn-group">
<button id="btn-refresh" class="btn">Refresh</button>
</div>
<h3>The process scheduler controls [processScheduler.getProcessCount()] loops.<h3>"}
text += "<div id=\"processTable\">"
text += getProcessTable()
text += "</div></body></html>"
usr << browse(text, "window=processSchedulerContext;size=800x600")
/datum/processSchedulerView/proc/bootstrap_browse()
usr << browse('bower_components/jquery/dist/jquery.min.js', "file=jquery.min.js;display=0")
usr << browse('bower_components/bootstrap2.3.2/bootstrap/js/bootstrap.min.js', "file=bootstrap.min.js;display=0")
usr << browse('bower_components/bootstrap2.3.2/bootstrap/css/bootstrap.min.css', "file=bootstrap.min.css;display=0")
usr << browse('bower_components/bootstrap2.3.2/bootstrap/img/glyphicons-halflings-white.png', "file=glyphicons-halflings-white.png;display=0")
usr << browse('bower_components/bootstrap2.3.2/bootstrap/img/glyphicons-halflings.png', "file=glyphicons-halflings.png;display=0")
usr << browse('bower_components/json2/json2.js', "file=json2.js;display=0")
/datum/processSchedulerView/proc/bootstrap_includes()
return {"
<link rel="stylesheet" href="bootstrap.min.css" />
<script type="text/javascript" src="json2.js"></script>
<script type="text/javascript" src="jquery.min.js"></script>
<script type="text/javascript" src="bootstrap.js"></script>
"}

View File

@@ -1,27 +0,0 @@
/**
* testDyingUpdateQueueProcess
* This process is an example of a process using an updateQueue.
* The datums updated by this process behave badly and block the update loop
* by sleeping. If you #define UPDATE_QUEUE_DEBUG, you will see the updateQueue
* killing off its worker processes and spawning new ones to work around slow
* updates. This means that if you have a code path that sleeps for a long time
* in mob.Life once in a blue moon, the mob update loop will not hang.
*/
/datum/slowTestDatum/proc/wackyUpdateProcessName()
sleep(rand(0,20)) // Randomly REALLY slow :|
/datum/controller/process/testDyingUpdateQueueProcess
var/tmp/datum/updateQueue/updateQueueInstance
var/tmp/list/testDatums = list()
/datum/controller/process/testDyingUpdateQueueProcess/setup()
name = "Dying UpdateQueue Process"
schedule_interval = 30 // every 3 seconds
updateQueueInstance = new
for(var/i = 1, i < 30, i++)
testDatums.Add(new /datum/slowTestDatum)
/datum/controller/process/testDyingUpdateQueueProcess/doWork()
updateQueueInstance.init(testDatums, "wackyUpdateProcessName")
updateQueueInstance.Run()

View File

@@ -1,35 +0,0 @@
/*
These are simple defaults for your project.
*/
#define DEBUG
var/global/datum/processSchedulerView/processSchedulerView
world
loop_checks = 0
New()
..()
processScheduler = new
processSchedulerView = new
mob
step_size = 8
New()
..()
verb
startProcessScheduler()
set name = "Start Process Scheduler"
processScheduler.setup()
processScheduler.start()
getProcessSchedulerContext()
set name = "Get Process Scheduler Status Panel"
processSchedulerView.getContext()
runUpdateQueueTests()
set name = "Run Update Queue Testsuite"
var/datum/updateQueueTests/t = new
t.runTests()

View File

@@ -1,15 +0,0 @@
/**
* testHungProcess
* This process is an example of a simple update loop process that hangs.
*/
/datum/controller/process/testHungProcess/setup()
name = "Hung Process"
schedule_interval = 30 // every 3 seconds
/datum/controller/process/testHungProcess/doWork()
sleep(1000) // FUCK
// scheck is also responsible for handling hung processes. If a process
// hangs, and later resumes, but has already been killed by the scheduler,
// scheck will force the process to bail out.
scheck()

View File

@@ -1,13 +0,0 @@
/**
* testNiceProcess
* This process is an example of a simple update loop process that is
* relatively fast.
*/
/datum/controller/process/testNiceProcess/setup()
name = "Nice Process"
schedule_interval = 10 // every second
/datum/controller/process/testNiceProcess/doWork()
sleep(rand(1,5)) // Just to pretend we're doing something

View File

@@ -1,28 +0,0 @@
/**
* testSlowProcess
* This process is an example of a simple update loop process that is slow.
* The update loop here sleeps inside to provide an example, but if you had
* a computationally intensive loop process that is simply slow, you can use
* scheck() inside the loop to force it to yield periodically according to
* the sleep_interval var. By default, scheck will cause a loop to sleep every
* 2 ticks.
*/
/datum/controller/process/testSlowProcess/setup()
name = "Slow Process"
schedule_interval = 30 // every 3 seconds
/datum/controller/process/testSlowProcess/doWork()
// set background = 1 will cause loop constructs to sleep periodically,
// whenever the BYOND scheduler deems it productive to do so.
// This behavior is not always sufficient, nor is it always consistent.
// Rather than leaving it up to the BYOND scheduler, we can control it
// ourselves and leave nothing to the black box.
set background = 1
for(var/i=1,i<30,i++)
// Just to pretend we're doing something here
sleep(rand(3, 5))
// Forces this loop to yield(sleep) periodically.
scheck()

View File

@@ -1,209 +0,0 @@
var/global/list/updateQueueTestCount = list()
/datum/updateQueueTests
var/start
proc
runTests()
world << "<b>Running 9 tests...</b>"
testUpdateQueuePerformance()
sleep(1)
testInplace()
sleep(1)
testInplaceUpdateQueuePerformance()
sleep(1)
testUpdateQueueReinit()
sleep(1)
testCrashingQueue()
sleep(1)
testEmptyQueue()
sleep(1)
testManySlowItemsInQueue()
sleep(1)
testVariableWorkerTimeout()
sleep(1)
testReallySlowItemInQueue()
sleep(1)
world << "<b>Finished!</b>"
beginTiming()
start = world.time
endTiming(text)
var/time = (world.time - start) / world.tick_lag
world << {"<b><font color="blue">Performance - [text] - <font color="green">[time]</font> ticks</font></b>"}
getCount()
return updateQueueTestCount[updateQueueTestCount.len]
incrementTestCount()
updateQueueTestCount.len++
updateQueueTestCount[updateQueueTestCount.len] = 0
assertCountEquals(count, text)
assertThat(getCount() == count, text)
assertCountLessThan(count, text)
assertThat(getCount() < count, text)
assertCountGreaterThan(count, text)
assertThat(getCount() > count, text)
assertThat(condition, text)
if (condition)
world << {"<font color="green"><b>PASS</b></font>: [text]"}
else
world << {"<b><font color="red">FAIL</font>: [text]</b>"}
testUpdateQueuePerformance()
incrementTestCount()
var/list/objs = new
for(var/i=1,i<=100000,i++)
objs.Add(new /datum/uqTestDatum/fast(updateQueueTestCount.len))
var/datum/updateQueue/uq = new(objs)
beginTiming()
uq.Run()
endTiming("updating 100000 simple objects")
assertCountEquals(100000, "test that update queue updates all objects expected")
del(objs)
del(uq)
testUpdateQueueReinit()
incrementTestCount()
var/list/objs = new
for(var/i=1,i<=100,i++)
objs.Add(new /datum/uqTestDatum/fast(updateQueueTestCount.len))
var/datum/updateQueue/uq = new(objs)
uq.Run()
objs = new
for(var/i=1,i<=100,i++)
objs.Add(new /datum/uqTestDatum/fast(updateQueueTestCount.len))
uq.init(objs)
uq.Run()
assertCountEquals(200, "test that update queue reinitializes properly and updates all objects as expected.")
del(objs)
del(uq)
testInplace()
incrementTestCount()
var/list/objs = new
for(var/i=1,i<=100,i++)
objs.Add(new /datum/uqTestDatum/fast(updateQueueTestCount.len))
var/datum/updateQueue/uq = new(objects = objs, inplace = 1)
uq.Run()
assertThat(objs.len == 0, "test that update queue inplace option really works inplace")
assertCountEquals(100, "test that inplace update queue updates the right number of objects")
del(objs)
del(uq)
testInplaceUpdateQueuePerformance()
incrementTestCount()
var/list/objs = new
for(var/i=1,i<=100000,i++)
objs.Add(new /datum/uqTestDatum/fast(updateQueueTestCount.len))
var/datum/updateQueue/uq = new(objs)
beginTiming()
uq.Run()
endTiming("updating 100000 simple objects in place")
del(objs)
del(uq)
testCrashingQueue()
incrementTestCount()
var/list/objs = new
for(var/i=1,i<=10,i++)
objs.Add(new /datum/uqTestDatum/fast(updateQueueTestCount.len))
objs.Add(new /datum/uqTestDatum/crasher(updateQueueTestCount.len))
for(var/i=1,i<=10,i++)
objs.Add(new /datum/uqTestDatum/fast(updateQueueTestCount.len))
var/datum/updateQueue/uq = new(objs)
uq.Run()
assertCountEquals(20, "test that update queue handles crashed update procs OK")
del(objs)
del(uq)
testEmptyQueue()
incrementTestCount()
var/list/objs = new
var/datum/updateQueue/uq = new(objs)
uq.Run()
assertCountEquals(0, "test that update queue doesn't barf on empty lists")
del(objs)
del(uq)
testManySlowItemsInQueue()
incrementTestCount()
var/list/objs = new
for(var/i=1,i<=30,i++)
objs.Add(new /datum/uqTestDatum/slow(updateQueueTestCount.len))
var/datum/updateQueue/uq = new(objs)
uq.Run()
assertCountEquals(30, "test that update queue slows down execution if too many objects are slow to update")
del(objs)
del(uq)
testVariableWorkerTimeout()
incrementTestCount()
var/list/objs = new
for(var/i=1,i<=20,i++)
objs.Add(new /datum/uqTestDatum/slow(updateQueueTestCount.len))
var/datum/updateQueue/uq = new(objs, workerTimeout=6)
uq.Run()
assertCountEquals(20, "test that variable worker timeout works properly")
del(objs)
del(uq)
testReallySlowItemInQueue()
incrementTestCount()
var/list/objs = new
for(var/i=1,i<=10,i++)
objs.Add(new /datum/uqTestDatum/fast(updateQueueTestCount.len))
objs.Add(new /datum/uqTestDatum/reallySlow(updateQueueTestCount.len))
for(var/i=1,i<=10,i++)
objs.Add(new /datum/uqTestDatum/fast(updateQueueTestCount.len))
var/datum/updateQueue/uq = new(objs)
uq.Run()
assertCountEquals(20, "test that update queue skips objects that are too slow to update")
del(objs)
del(uq)
datum/uqTestDatum
var/testNum
New(testNum)
..()
src.testNum = testNum
proc/update()
updateQueueTestCount[testNum]++
proc/lag(cycles)
set background = 1
for(var/i=0,i<cycles,)
i++
datum/uqTestDatum/fast
datum/uqTestDatum/slow
update()
set background = 1
var/start = world.timeofday
while(world.timeofday - start < 5) // lag 4 deciseconds
..()
datum/uqTestDatum/reallySlow
update()
set background = 1
var/start = world.timeofday
while(world.timeofday - start < 300) // lag 30 seconds
..()
datum/uqTestDatum/crasher
update()
CRASH("I crashed! (I am supposed to crash XD)")
..() // This should do nothing lol

View File

@@ -1,24 +0,0 @@
/**
* testUpdateQueueProcess
* This process is an example of a process using an updateQueue.
* The datums updated by this process behave nicely and do not block.
*/
/datum/fastTestDatum/proc/wackyUpdateProcessName()
sleep(prob(10)) // Pretty quick, usually instant
/datum/controller/process/testUpdateQueueProcess
var/tmp/datum/updateQueue/updateQueueInstance
var/tmp/list/testDatums = list()
/datum/controller/process/testUpdateQueueProcess/setup()
name = "UpdateQueue Process"
schedule_interval = 20 // every 2 seconds
updateQueueInstance = new
for(var/i = 1, i < 30, i++)
testDatums.Add(new /datum/fastTestDatum)
/datum/controller/process/testUpdateQueueProcess/doWork()
updateQueueInstance.init(testDatums, "wackyUpdateProcessName")
updateQueueInstance.Run()

View File

@@ -1,13 +0,0 @@
/**
* testBadZombieProcess
* This process is an example of a simple update loop process that hangs.
*/
/datum/controller/process/testZombieProcess/setup()
name = "Zombie Process"
schedule_interval = 30 // every 3 seconds
/datum/controller/process/testZombieProcess/doWork()
for (var/i = 0, i < 1000, i++)
sleep(1)
scheck()