mirror of
https://github.com/vgstation-coders/vgstation13.git
synced 2025-12-10 10:21:11 +00:00
Revert "Merge pull request #27120 from ShiftyRail/rust_g"
This reverts commitda896f13a4, reversing changes made to2bc700b001.
This commit is contained in:
@@ -707,7 +707,7 @@
|
||||
if ("address")
|
||||
sqladdress = value
|
||||
if ("port")
|
||||
sqlport = text2num(value)
|
||||
sqlport = value
|
||||
if ("database")
|
||||
sqldb = value
|
||||
if ("login")
|
||||
|
||||
@@ -22,7 +22,8 @@ var/datum/subsystem/dbcore/SSdbcore
|
||||
var/last_error
|
||||
var/list/active_queries = list()
|
||||
|
||||
var/connection // Arbitrary handle returned from rust_g.
|
||||
var/datum/BSQL_Connection/connection
|
||||
var/datum/BSQL_Operation/connectOperation
|
||||
|
||||
/datum/subsystem/dbcore/New()
|
||||
NEW_SS_GLOBAL(SSdbcore)
|
||||
@@ -69,6 +70,7 @@ var/datum/subsystem/dbcore/SSdbcore
|
||||
|
||||
/datum/subsystem/dbcore/Recover()
|
||||
connection = SSdbcore.connection
|
||||
connectOperation = SSdbcore.connectOperation
|
||||
|
||||
/datum/subsystem/dbcore/Shutdown()
|
||||
//This is as close as we can get to the true round end before Disconnect() without changing where it's called, defeating the reason this is a subsystem
|
||||
@@ -76,6 +78,7 @@ var/datum/subsystem/dbcore/SSdbcore
|
||||
ShutdownQuery()
|
||||
if(IsConnected())
|
||||
Disconnect()
|
||||
world.BSQL_Shutdown()
|
||||
|
||||
/datum/subsystem/dbcore/proc/Connect()
|
||||
if(initialized && IsConnected())
|
||||
@@ -98,42 +101,40 @@ var/datum/subsystem/dbcore/SSdbcore
|
||||
var/db = ids["db"]
|
||||
var/address = ids["address"]
|
||||
var/port = ids["port"]
|
||||
var/timeout = max(config.async_query_timeout, config.blocking_query_timeout)
|
||||
var/thread_limit = config.bsql_thread_limit
|
||||
|
||||
var/result = json_decode(rustg_sql_connect_pool(json_encode(list(
|
||||
"host" = address,
|
||||
"port" = port,
|
||||
"user" = user,
|
||||
"pass" = pass,
|
||||
"db_name" = db,
|
||||
"max_threads" = 5,
|
||||
"read_timeout" = timeout,
|
||||
"write_timeout" = timeout,
|
||||
"max_threads" = thread_limit,
|
||||
))))
|
||||
|
||||
. = (result["status"] == "ok")
|
||||
if (.)
|
||||
connection = result["handle"]
|
||||
else
|
||||
connection = new /datum/BSQL_Connection(BSQL_CONNECTION_TYPE_MARIADB, config.async_query_timeout, config.blocking_query_timeout, config.bsql_thread_limit)
|
||||
var/error
|
||||
if(!connection || connection.gcDestroyed)
|
||||
connection = null
|
||||
last_error = result["data"]
|
||||
log_sql("Connect() failed | [last_error]")
|
||||
error = last_error
|
||||
else
|
||||
SSdbcore.last_error = null
|
||||
connectOperation = connection.BeginConnect(address, port, user, pass, db)
|
||||
// This is a bit of a hack. It sets initialized to be true BEFORE 'UNTIL', which sleeps. All the code done before is done synchronously.
|
||||
// This ensures that the server establishes a connection and a connection operation before any call to IsBanned(), for instance, is made.
|
||||
// Now IsBanned() calls will patiently wait on a DB connection to exist to be resolved, preventing queries to be queued up before an actual connection is made.
|
||||
initialized = TRUE
|
||||
if(SSdbcore.last_error)
|
||||
CRASH(SSdbcore.last_error)
|
||||
UNTIL(connectOperation.IsComplete())
|
||||
error = connectOperation.GetError()
|
||||
. = !error
|
||||
if (!.)
|
||||
last_error = error
|
||||
log_sql("Connect() failed | [error]")
|
||||
++failed_connections
|
||||
|
||||
/datum/subsystem/dbcore/proc/Disconnect()
|
||||
failed_connections = 0
|
||||
if (connection)
|
||||
rustg_sql_disconnect_pool(connection)
|
||||
connection = null
|
||||
BSQL_DEL_CALL(connection)
|
||||
BSQL_DEL_CALL(connectOperation)
|
||||
initialized = 0
|
||||
connection = null
|
||||
connectOperation = null
|
||||
|
||||
/datum/subsystem/dbcore/proc/CheckSchemaVersion()
|
||||
if(config.sql_enabled)
|
||||
if(Connect())
|
||||
log_world("Database connection established.")
|
||||
/*
|
||||
var/datum/DBQuery/query_db_version = NewQuery(/**xx**/"SELECT major, minor FROM schema_revision ORDER BY date DESC LIMIT 1")
|
||||
var/datum/DBQuery/query_db_version = NewQuery("SELECT major, minor FROM schema_revision ORDER BY date DESC LIMIT 1")
|
||||
query_db_version.Execute()
|
||||
if(query_db_version.NextRow())
|
||||
db_major = text2num(query_db_version.item[1])
|
||||
@@ -167,10 +168,10 @@ var/datum/subsystem/dbcore/SSdbcore
|
||||
/datum/subsystem/dbcore/proc/SetRoundID()
|
||||
if(!Connect())
|
||||
return
|
||||
var/datum/DBQuery/query_round_initialize = SSdbcore.NewQuery(/**xx**/"")
|
||||
var/datum/DBQuery/query_round_initialize = SSdbcore.NewQuery("INSERT INTO [format_table_name("round")] (initialize_datetime, server_ip, server_port) VALUES (Now(), INET_ATON(IF('[world.internet_address]' LIKE '', '0', '[world.internet_address]')), '[world.port]')")
|
||||
query_round_initialize.Execute(async = FALSE)
|
||||
qdel(query_round_initialize)
|
||||
var/datum/DBQuery/query_round_last_id = SSdbcore.NewQuery(/**xx**/"SELECT LAST_INSERT_ID()")
|
||||
var/datum/DBQuery/query_round_last_id = SSdbcore.NewQuery("SELECT LAST_INSERT_ID()")
|
||||
query_round_last_id.Execute(async = FALSE)
|
||||
if(query_round_last_id.NextRow(async = FALSE))
|
||||
GLOB.round_id = query_round_last_id.item[1]
|
||||
@@ -179,29 +180,52 @@ var/datum/subsystem/dbcore/SSdbcore
|
||||
/datum/subsystem/dbcore/proc/SetRoundStart()
|
||||
if(!Connect())
|
||||
return
|
||||
var/datum/DBQuery/query_round_start = SSdbcore.NewQuery()
|
||||
var/datum/DBQuery/query_round_start = SSdbcore.NewQuery("UPDATE [format_table_name("round")] SET start_datetime = Now() WHERE id = [GLOB.round_id]")
|
||||
query_round_start.Execute()
|
||||
qdel(query_round_start)
|
||||
|
||||
/datum/subsystem/dbcore/proc/SetRoundEnd()
|
||||
if(!Connect())
|
||||
return
|
||||
//var/datum/DBQuery/query_round_end = SSdbcore.NewQuery()
|
||||
var/sql_station_name = sanitizeSQL(station_name())
|
||||
//var/datum/DBQuery/query_round_end = SSdbcore.NewQuery("UPDATE [format_table_name("round")] SET end_datetime = Now(), game_mode_result = '[sanitizeSQL(SSticker.mode_result)]', station_name = '[sql_station_name]' WHERE id = [GLOB.round_id]")
|
||||
query_round_end.Execute()
|
||||
qdel(query_round_end)
|
||||
|
||||
/datum/subsystem/dbcore/proc/ShutdownQuery()
|
||||
var/datum/DBQuery/query_round_shutdown = SSdbcore.NewQuery()
|
||||
var/datum/DBQuery/query_round_shutdown = SSdbcore.NewQuery("UPDATE [format_table_name("round")] SET shutdown_datetime = Now(), end_state = '[sanitizeSQL(SSticker.end_state)]' WHERE id = [GLOB.round_id]")
|
||||
query_round_shutdown.Execute()
|
||||
qdel(query_round_shutdown)
|
||||
*/
|
||||
|
||||
/datum/subsystem/dbcore/proc/Disconnect()
|
||||
initialized = 0
|
||||
failed_connections = 0
|
||||
BSQL_DEL_CALL(connectOperation)
|
||||
BSQL_DEL_CALL(connection)
|
||||
connectOperation = null
|
||||
connection = null
|
||||
|
||||
/datum/subsystem/dbcore/proc/IsConnected()
|
||||
if(!config.sql_enabled)
|
||||
return FALSE
|
||||
if (!connection)
|
||||
return FALSE
|
||||
return json_decode(rustg_sql_connected(connection))["status"] == "online"
|
||||
//block until any connect operations finish
|
||||
var/datum/BSQL_Connection/_connection = connection
|
||||
var/datum/BSQL_Operation/op = connectOperation
|
||||
var/ticker = 0
|
||||
while ( (!_connection || _connection.gcDestroyed) || !op.IsComplete() ) // Waiting we have a real connection and that it's complete
|
||||
stoplag()
|
||||
ticker++
|
||||
if (ticker > 100 && (!connection || !connectOperation || connection.gcDestroyed))
|
||||
message_admins("Error getting connection status. Attempting to reconnect.")
|
||||
Disconnect()
|
||||
Connect()
|
||||
return
|
||||
return connection && !(connection.gcDestroyed && !op.GetError()) // Connect
|
||||
|
||||
/datum/subsystem/dbcore/proc/Quote(str)
|
||||
if(connection)
|
||||
return connection.Quote(str)
|
||||
|
||||
/datum/subsystem/dbcore/proc/ErrorMsg()
|
||||
if(!config.sql_enabled)
|
||||
@@ -212,8 +236,8 @@ var/datum/subsystem/dbcore/SSdbcore
|
||||
last_error = error
|
||||
|
||||
//
|
||||
/datum/subsystem/dbcore/proc/NewQuery(sql_query, arguments)
|
||||
return new /datum/DBQuery(connection, sql_query, arguments)
|
||||
/datum/subsystem/dbcore/proc/NewQuery(sql_query)
|
||||
return new /datum/DBQuery(sql_query, connection)
|
||||
|
||||
/datum/subsystem/dbcore/proc/QuerySelect(list/querys, warn = FALSE, qdel = FALSE)
|
||||
if (!islist(querys))
|
||||
@@ -246,60 +270,55 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table
|
||||
It was included because it is still supported in mariadb.
|
||||
It does not work with duplicate_key and the mysql server ignores it in those cases
|
||||
*/
|
||||
/datum/subsystem/dbcore/proc/MassInsert(table, list/rows, duplicate_key = FALSE, ignore_errors = FALSE, delayed = FALSE, warn = FALSE, async = TRUE, special_columns = null)
|
||||
/datum/subsystem/dbcore/proc/MassInsert(table, list/rows, duplicate_key = FALSE, ignore_errors = FALSE, delayed = FALSE, warn = FALSE, async = TRUE)
|
||||
if (!table || !rows || !istype(rows))
|
||||
return
|
||||
|
||||
// Prepare column list
|
||||
var/list/columns = list()
|
||||
var/list/has_question_mark = list()
|
||||
var/list/sorted_rows = list()
|
||||
|
||||
for (var/list/row in rows)
|
||||
var/list/sorted_row = list()
|
||||
sorted_row.len = columns.len
|
||||
for (var/column in row)
|
||||
columns[column] = "?"
|
||||
has_question_mark[column] = TRUE
|
||||
for (var/column in special_columns)
|
||||
columns[column] = special_columns[column]
|
||||
has_question_mark[column] = findtext(special_columns[column], "?")
|
||||
var/idx = columns[column]
|
||||
if (!idx)
|
||||
idx = columns.len + 1
|
||||
columns[column] = idx
|
||||
sorted_row.len = columns.len
|
||||
|
||||
// Prepare SQL query full of placeholders
|
||||
var/list/query_parts = list("INSERT")
|
||||
if (delayed)
|
||||
query_parts += " DELAYED"
|
||||
if (ignore_errors)
|
||||
query_parts += " IGNORE"
|
||||
query_parts += " INTO "
|
||||
query_parts += table
|
||||
query_parts += "\n([columns.Join(", ")])\nVALUES"
|
||||
|
||||
var/list/arguments = list()
|
||||
var/has_row = FALSE
|
||||
for (var/list/row in rows)
|
||||
if (has_row)
|
||||
query_parts += ","
|
||||
query_parts += "\n ("
|
||||
var/has_col = FALSE
|
||||
for (var/column in columns)
|
||||
if (has_col)
|
||||
query_parts += ", "
|
||||
if (has_question_mark[column])
|
||||
var/name = "p[arguments.len]"
|
||||
query_parts += replacetext(columns[column], "?", ":[name]")
|
||||
arguments[name] = row[column]
|
||||
else
|
||||
query_parts += columns[column]
|
||||
has_col = TRUE
|
||||
query_parts += ")"
|
||||
has_row = TRUE
|
||||
sorted_row[idx] = row[column]
|
||||
sorted_rows[++sorted_rows.len] = sorted_row
|
||||
|
||||
if (duplicate_key == TRUE)
|
||||
var/list/column_list = list()
|
||||
for (var/column in columns)
|
||||
column_list += "[column] = VALUES([column])"
|
||||
query_parts += "\nON DUPLICATE KEY UPDATE [column_list.Join(", ")]"
|
||||
else if (duplicate_key != FALSE)
|
||||
query_parts += duplicate_key
|
||||
duplicate_key = "ON DUPLICATE KEY UPDATE [column_list.Join(", ")]\n"
|
||||
else if (duplicate_key == FALSE)
|
||||
duplicate_key = null
|
||||
|
||||
var/datum/DBQuery/Query = NewQuery(query_parts.Join(), arguments)
|
||||
if (ignore_errors)
|
||||
ignore_errors = " IGNORE"
|
||||
else
|
||||
ignore_errors = null
|
||||
|
||||
if (delayed)
|
||||
delayed = " DELAYED"
|
||||
else
|
||||
delayed = null
|
||||
|
||||
var/list/sqlrowlist = list()
|
||||
var/len = columns.len
|
||||
for (var/list/row in sorted_rows)
|
||||
if (length(row) != len)
|
||||
row.len = len
|
||||
for (var/value in row)
|
||||
if (value == null)
|
||||
value = "NULL"
|
||||
sqlrowlist += "([row.Join(", ")])"
|
||||
|
||||
sqlrowlist = " [sqlrowlist.Join(",\n ")]"
|
||||
var/datum/DBQuery/Query = NewQuery("INSERT[delayed][ignore_errors] INTO [table]\n([columns.Join(", ")])\nVALUES\n[sqlrowlist]\n[duplicate_key]")
|
||||
if (warn)
|
||||
. = Query.warn_execute(async)
|
||||
else
|
||||
@@ -307,32 +326,25 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table
|
||||
qdel(Query)
|
||||
|
||||
/datum/DBQuery
|
||||
// Inputs
|
||||
var/connection
|
||||
var/sql
|
||||
var/arguments
|
||||
var/sql // The sql query being executed.
|
||||
var/list/item //list of data values populated by NextRow()
|
||||
|
||||
// Status information
|
||||
var/in_progress
|
||||
var/last_error
|
||||
var/last_activity
|
||||
var/last_activity_time
|
||||
|
||||
// Output
|
||||
var/list/list/rows
|
||||
var/next_row_to_take = 1
|
||||
var/affected
|
||||
var/last_insert_id
|
||||
var/last_error
|
||||
var/skip_next_is_complete
|
||||
var/in_progress
|
||||
var/datum/BSQL_Connection/connection
|
||||
var/datum/BSQL_Operation/Query/query
|
||||
|
||||
var/list/item //list of data values populated by NextRow()
|
||||
|
||||
/datum/DBQuery/New(connection, sql, arguments)
|
||||
/datum/DBQuery/New(sql_query, datum/BSQL_Connection/connection)
|
||||
log_query_debug("new query with SQL : {[sql_query]} \n on [SSdbcore.name]")
|
||||
SSdbcore.active_queries[src] = TRUE
|
||||
Activity("Created")
|
||||
item = list()
|
||||
src.connection = connection
|
||||
src.sql = sql
|
||||
src.arguments = arguments
|
||||
sql = sql_query
|
||||
|
||||
/datum/DBQuery/Destroy()
|
||||
log_query_debug("query with [sql] being qdeleted. Will die any second now.")
|
||||
@@ -344,6 +356,12 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table
|
||||
log_query_debug("query with [sql] died.")
|
||||
return ..()
|
||||
|
||||
/datum/DBQuery/proc/SetQuery(new_sql)
|
||||
if(in_progress)
|
||||
CRASH("Attempted to set new sql while waiting on active query")
|
||||
Close()
|
||||
sql = new_sql
|
||||
|
||||
/datum/DBQuery/proc/Activity(activity)
|
||||
last_activity = activity
|
||||
last_activity_time = world.time
|
||||
@@ -358,18 +376,30 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table
|
||||
if(in_progress)
|
||||
CRASH("Attempted to start a new query while waiting on the old one")
|
||||
|
||||
if(!SSdbcore.IsConnected())
|
||||
if(!connection || connection.gcDestroyed)
|
||||
last_error = "No connection!"
|
||||
return FALSE
|
||||
|
||||
var/start_time
|
||||
var/timed_out
|
||||
if(!async)
|
||||
start_time = REALTIMEOFDAY
|
||||
Close()
|
||||
. = run_query(async)
|
||||
var/timed_out = !. && findtext(last_error, "Operation timed out")
|
||||
timed_out = run_query(async)
|
||||
if(query.GetErrorCode() == 2006) //2006 is the return code for "MySQL server has gone away" time-out error, meaning the connection has been lost to the server (if it's still alive)
|
||||
log_sql("Executing query encountered returned a lost database connection (2006).")
|
||||
SSdbcore.Disconnect()
|
||||
if(SSdbcore.Connect()) //connection was restablished, reattempt the query
|
||||
log_sql("Connection restablished")
|
||||
timed_out = run_query(async)
|
||||
else
|
||||
log_sql("Executing query failed to restablish database connection.")
|
||||
skip_next_is_complete = TRUE
|
||||
var/error = (!query || query.gcDestroyed) ? "Query object deleted!" : query.GetError()
|
||||
last_error = error
|
||||
. = !error
|
||||
if(!. && log_error)
|
||||
log_sql("[last_error] | Query used: [sql]")
|
||||
log_sql("[error] | Query used: [sql]")
|
||||
if(!async && timed_out)
|
||||
log_query_debug("Query execution started at [start_time]")
|
||||
log_query_debug("Query execution ended at [REALTIMEOFDAY]")
|
||||
@@ -378,50 +408,53 @@ Delayed insert mode was removed in mysql 7 and only works with MyISAM type table
|
||||
slow_query_check()
|
||||
|
||||
/datum/DBQuery/proc/run_query(async)
|
||||
var/job_result_str
|
||||
|
||||
if (async)
|
||||
var/job_id = rustg_sql_query_async(connection, sql, json_encode(arguments))
|
||||
in_progress = TRUE
|
||||
UNTIL((job_result_str = rustg_sql_check_query(job_id)) != RUSTG_JOB_NO_RESULTS_YET)
|
||||
in_progress = FALSE
|
||||
|
||||
if (job_result_str == RUSTG_JOB_ERROR)
|
||||
last_error = job_result_str
|
||||
return FALSE
|
||||
query = connection.BeginQuery(sql)
|
||||
if(!async)
|
||||
. = !query.WaitForCompletion()
|
||||
else
|
||||
job_result_str = rustg_sql_query_blocking(connection, sql, json_encode(arguments))
|
||||
|
||||
var/result = json_decode(job_result_str)
|
||||
switch (result["status"])
|
||||
if ("ok")
|
||||
rows = result["rows"]
|
||||
affected = result["affected"]
|
||||
last_insert_id = result["last_insert_id"]
|
||||
return TRUE
|
||||
if ("err")
|
||||
last_error = result["data"]
|
||||
return FALSE
|
||||
if ("offline")
|
||||
last_error = "offline"
|
||||
return FALSE
|
||||
in_progress = TRUE
|
||||
UNTIL(query.IsComplete())
|
||||
in_progress = FALSE
|
||||
|
||||
/datum/DBQuery/proc/slow_query_check()
|
||||
message_admins("HEY! A database query timed out. Tell coders or Pomf what happened, please.")
|
||||
|
||||
/datum/DBQuery/proc/NextRow(async = TRUE)
|
||||
Activity("NextRow")
|
||||
|
||||
if (rows && next_row_to_take <= rows.len)
|
||||
item = rows[next_row_to_take]
|
||||
next_row_to_take++
|
||||
return !!item
|
||||
UNTIL(!in_progress)
|
||||
if(!skip_next_is_complete)
|
||||
if(!async)
|
||||
query.WaitForCompletion()
|
||||
else
|
||||
in_progress = TRUE
|
||||
UNTIL(query.IsComplete())
|
||||
in_progress = FALSE
|
||||
else
|
||||
return FALSE
|
||||
skip_next_is_complete = FALSE
|
||||
|
||||
last_error = query.GetError()
|
||||
var/list/results = query.CurrentRow()
|
||||
. = results != null
|
||||
|
||||
item.Cut()
|
||||
//populate item array
|
||||
for(var/I in results)
|
||||
item += results[I]
|
||||
|
||||
/datum/DBQuery/proc/ErrorMsg()
|
||||
return last_error
|
||||
|
||||
/datum/DBQuery/proc/Close()
|
||||
rows = null
|
||||
item = null
|
||||
item.Cut()
|
||||
qdel(query)
|
||||
query = null
|
||||
|
||||
/world/BSQL_Debug(message)
|
||||
if(!config.bsql_debug)
|
||||
return
|
||||
|
||||
//strip sensitive stuff
|
||||
if(findtext(message, ": OpenConnection("))
|
||||
message = "OpenConnection CENSORED"
|
||||
|
||||
log_sql("BSQL_DEBUG: [message]")
|
||||
|
||||
Reference in New Issue
Block a user