Add a new and improved mapmerge (#33869)

Key benefits of the new mapmerge include: multi-Z support, effective
reuse of deleted keys, automatic handling of key overflow, and utilizing
a git pre-commit hook to eliminate the need to run batch files manually.
This commit is contained in:
Tad Hardesty
2017-12-28 13:01:34 -08:00
committed by ShizCalev
parent d95d4cb9d6
commit 9639061433
14 changed files with 801 additions and 0 deletions

14
tools/hooks/install.bat Normal file
View File

@@ -0,0 +1,14 @@
@echo off
cd %~dp0
for %%f in (*.hook) do (
echo Installing hook: %%~nf
copy %%f ..\..\.git\hooks\%%~nf >nul
)
for %%f in (*.merge) do (
echo Installing merge driver: %%~nf
echo [merge "%%~nf"]^
driver = tools/hooks/%%f %%P %%O %%A %%B %%L >> ..\..\.git\config
)
echo Done
pause

11
tools/hooks/install.sh Normal file
View File

@@ -0,0 +1,11 @@
#!/bin/bash
cd "$(dirname "$0")"
for f in *.hook; do
echo Installing hook: ${f%.hook}
cp $f ../../.git/hooks/${f%.hook}
done
for f in *.merge; do
echo Installing merge driver: ${f%.merge}
git config --replace-all merge.${f%.merge}.driver "tools/hooks/$f %P %O %A %B %L"
done
echo "Done"

View File

@@ -0,0 +1,2 @@
#!/bin/bash
exec tools/hooks/python.sh -m precommit

17
tools/hooks/python.sh Normal file
View File

@@ -0,0 +1,17 @@
#!/bin/bash
set -e
if command -v python3 >/dev/null 2>&1; then
PY=python3
else
PY=python
fi
PATHSEP=$($PY - <<'EOF'
import sys, os
if sys.version_info.major != 3 or sys.version_info.minor < 6:
sys.stderr.write("Python 3.6+ is required: " + sys.version + "\n")
exit(1)
print(os.pathsep)
EOF
)
export PYTHONPATH=tools/mapmerge2/${PATHSEP}${PYTHONPATH}
$PY "$@"

View File

@@ -0,0 +1,8 @@
#!/usr/bin/env python3
import frontend
import dmm
if __name__ == '__main__':
settings = frontend.read_settings()
for fname in frontend.process(settings, "convert"):
dmm.DMM.from_file(fname).to_file(fname, settings.tgm)

459
tools/mapmerge2/dmm.py Normal file
View File

@@ -0,0 +1,459 @@
# Tools for working with DreamMaker maps
import io
import bidict
import random
from collections import namedtuple
TGM_HEADER = "//MAP CONVERTED BY dmm2tgm.py THIS HEADER COMMENT PREVENTS RECONVERSION, DO NOT REMOVE"
ENCODING = 'utf-8'
Coordinate = namedtuple('Coordinate', ['x', 'y', 'z'])
class DMM:
__slots__ = ['key_length', 'size', 'dictionary', 'grid', 'header']
def __init__(self, key_length, size):
self.key_length = key_length
self.size = size
self.dictionary = bidict.bidict()
self.grid = {}
self.header = None
@staticmethod
def from_file(fname):
# stream the file rather than forcing all its contents to memory
with open(fname, 'r', encoding=ENCODING) as f:
return _parse(iter(lambda: f.read(1), ''))
@staticmethod
def from_bytes(bytes):
return _parse(bytes.decode(ENCODING))
def to_file(self, fname, tgm = True):
with open(fname, 'w', newline='\n', encoding=ENCODING) as f:
(save_tgm if tgm else save_dmm)(self, f)
def to_bytes(self, tgm = True):
bio = io.BytesIO()
with io.TextIOWrapper(bio, newline='\n', encoding=ENCODING) as f:
(save_tgm if tgm else save_dmm)(self, f)
f.flush()
return bio.getvalue()
def generate_new_key(self):
# ensure that free keys exist by increasing the key length if necessary
free_keys = (BASE ** self.key_length) - len(self.dictionary)
while free_keys <= 0:
self.key_length += 1
free_keys = (BASE ** self.key_length) - len(self.dictionary)
# choose one of the free keys at random
key = 0
while free_keys:
if key not in self.dictionary:
# this construction is used to avoid needing to construct the
# full set in order to random.choice() from it
if random.random() < 1 / free_keys:
return key
free_keys -= 1
key += 1
raise RuntimeError("ran out of keys, this shouldn't happen")
@property
def coords_zyx(self):
for z in range(1, self.size.z + 1):
for y in range(1, self.size.y + 1):
for x in range(1, self.size.x + 1):
yield (z, y, x)
@property
def coords_z(self):
return range(1, self.size.z + 1)
@property
def coords_yx(self):
for y in range(1, self.size.y + 1):
for x in range(1, self.size.x + 1):
yield (y, x)
# ----------
# key handling
# Base 52 a-z A-Z dictionary for fast conversion
BASE = 52
base52 = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
base52_r = {x: i for i, x in enumerate(base52)}
assert len(base52) == BASE and len(base52_r) == BASE
def key_to_num(key):
num = 0
for ch in key:
num = BASE * num + base52_r[ch]
return num
def num_to_key(num, key_length):
if num >= BASE ** key_length:
raise KeyTooLarge(f"num={num} does not fit in key_length={key_length}")
result = ''
while num:
result = base52[num % BASE] + result
num //= BASE
assert len(result) <= key_length
return base52[0] * (key_length - len(result)) + result
class KeyTooLarge(Exception):
pass
# ----------
# An actual atom parser
def parse_map_atom(atom):
try:
i = atom.index('{')
except ValueError:
return atom, {}
path, rest = atom[:i], atom[i+1:]
vars = {}
in_string = False
in_name = False
escaping = False
current_name = ''
current = ''
for ch in rest:
if escaping:
escaping = False
current += ch
elif ch == '\\':
escaping = True
elif ch == '"':
in_string = not in_string
current += ch
elif in_string:
current += ch
elif ch == ';':
vars[current_name.strip()] = current.strip()
current_name = current = ''
elif ch == '=':
current_name = current
current = ''
elif ch == '}':
vars[current_name.strip()] = current.strip()
break
elif ch not in ' ':
current += ch
return path, vars
# ----------
# TGM writer
def save_tgm(dmm, output):
output.write(f"{TGM_HEADER}\n")
if dmm.header:
output.write(f"{dmm.header}\n")
# write dictionary in tgm format
for key, value in sorted(dmm.dictionary.items()):
output.write(f'"{num_to_key(key, dmm.key_length)}" = (\n')
for idx, thing in enumerate(value):
in_quote_block = False
in_varedit_block = False
for char in thing:
if in_quote_block:
if char == '"':
in_quote_block = False
output.write(char)
elif char == '"':
in_quote_block = True
output.write(char)
elif not in_varedit_block:
if char == "{":
in_varedit_block = True
output.write("{\n\t")
else:
output.write(char)
elif char == ";":
output.write(";\n\t")
elif char == "}":
output.write("\n\t}")
in_varedit_block = False
else:
output.write(char)
if idx < len(value) - 1:
output.write(",\n")
output.write(")\n")
# thanks to YotaXP for finding out about this one
max_x, max_y, max_z = dmm.size
for z in range(1, max_z + 1):
output.write("\n")
for x in range(1, max_x + 1):
output.write(f"({x},{1},{z}) = {{\"\n")
for y in range(1, max_y + 1):
output.write(f"{num_to_key(dmm.grid[x, y, z], dmm.key_length)}\n")
output.write("\"}\n")
# ----------
# DMM writer
def save_dmm(dmm, output):
if dmm.header:
output.write(f"{dmm.header}\n")
# writes a tile dictionary the same way Dreammaker does
for key, value in sorted(dmm.dictionary.items()):
output.write(f'"{num_to_key(key, dmm.key_length)}" = ({",".join(value)})\n')
output.write("\n")
# writes a map grid the same way Dreammaker does
max_x, max_y, max_z = dmm.size
for z in range(1, max_z + 1):
output.write(f"(1,1,{z}) = {{\"\n")
for y in range(1, max_y + 1):
for x in range(1, max_x + 1):
try:
output.write(num_to_key(dmm.grid[x, y, z], dmm.key_length))
except KeyError:
print(f"Key error: ({x}, {y}, {z})")
output.write("\n")
output.write("\"}\n")
# ----------
# Parser
def _parse(map_raw_text):
in_comment_line = False
comment_trigger = False
in_quote_block = False
in_key_block = False
in_data_block = False
in_varedit_block = False
after_data_block = False
escaping = False
skip_whitespace = False
dictionary = bidict.bidict()
duplicate_keys = {}
curr_key_len = 0
curr_key = 0
curr_datum = ""
curr_data = list()
in_map_block = False
in_coord_block = False
in_map_string = False
iter_x = 0
adjust_y = True
curr_num = ""
reading_coord = "x"
key_length = 0
maxx = 0
maxy = 0
maxz = 0
curr_x = 0
curr_y = 0
curr_z = 0
grid = dict()
it = iter(map_raw_text)
# map block
for char in it:
if char == "\n":
in_comment_line = False
comment_trigger = False
continue
elif in_comment_line:
continue
elif char == "\t":
continue
if char == "/" and not in_quote_block:
if comment_trigger:
in_comment_line = True
continue
else:
comment_trigger = True
else:
comment_trigger = False
if in_data_block:
if in_varedit_block:
if in_quote_block:
if char == "\\":
curr_datum = curr_datum + char
escaping = True
elif escaping:
curr_datum = curr_datum + char
escaping = False
elif char == "\"":
curr_datum = curr_datum + char
in_quote_block = False
else:
curr_datum = curr_datum + char
else:
if skip_whitespace and char == " ":
skip_whitespace = False
continue
skip_whitespace = False
if char == "\"":
curr_datum = curr_datum + char
in_quote_block = True
elif char == ";":
skip_whitespace = True
curr_datum = curr_datum + char
elif char == "}":
curr_datum = curr_datum + char
in_varedit_block = False
else:
curr_datum = curr_datum + char
elif char == "{":
curr_datum = curr_datum + char
in_varedit_block = True
elif char == ",":
curr_data.append(curr_datum)
curr_datum = ""
elif char == ")":
curr_data.append(curr_datum)
curr_data = tuple(curr_data)
try:
dictionary[curr_key] = curr_data
except bidict.ValueDuplicationError:
# if the map has duplicate values, eliminate them now
duplicate_keys[curr_key] = dictionary.inv[curr_data]
curr_data = list()
curr_datum = ""
curr_key = 0
curr_key_len = 0
in_data_block = False
after_data_block = True
else:
curr_datum = curr_datum + char
elif in_key_block:
if char == "\"":
in_key_block = False
if key_length == 0:
key_length = curr_key_len
else:
assert key_length == curr_key_len
else:
curr_key = BASE * curr_key + base52_r[char]
curr_key_len += 1
# else we're looking for a key block, a data block or the map block
elif char == "\"":
in_key_block = True
after_data_block = False
elif char == "(":
if after_data_block:
in_coord_block = True
after_data_block = False
curr_key = 0
curr_key_len = 0
break
else:
in_data_block = True
after_data_block = False
# grid block
for char in it:
if in_coord_block:
if char == ",":
if reading_coord == "x":
curr_x = int(curr_num)
if curr_x > maxx:
maxx = curr_x
iter_x = 0
curr_num = ""
reading_coord = "y"
elif reading_coord == "y":
curr_y = int(curr_num)
if curr_y > maxy:
maxy = curr_y
curr_num = ""
reading_coord = "z"
else:
raise ValueError("too many dimensions")
elif char == ")":
curr_z = int(curr_num)
if curr_z > maxz:
maxz = curr_z
in_coord_block = False
reading_coord = "x"
curr_num = ""
else:
curr_num = curr_num + char
elif in_map_string:
if char == "\"":
in_map_string = False
adjust_y = True
curr_y -= 1
elif char == "\n":
if adjust_y:
adjust_y = False
else:
curr_y += 1
if curr_x > maxx:
maxx = curr_x
if iter_x > 1:
curr_x = 1
iter_x = 0
else:
curr_key = BASE * curr_key + base52_r[char]
curr_key_len += 1
if curr_key_len == key_length:
iter_x += 1
if iter_x > 1:
curr_x += 1
grid[curr_x, curr_y, curr_z] = duplicate_keys.get(curr_key, curr_key)
curr_key = 0
curr_key_len = 0
# else look for coordinate block or a map string
elif char == "(":
in_coord_block = True
elif char == "\"":
in_map_string = True
if curr_y > maxy:
maxy = curr_y
data = DMM(key_length, Coordinate(maxx, maxy, maxz))
data.dictionary = dictionary
data.grid = grid
return data

View File

@@ -0,0 +1,5 @@
@echo off
set MAPROOT=../../_maps/
set TGM=1
python convert.py
pause

127
tools/mapmerge2/frontend.py Normal file
View File

@@ -0,0 +1,127 @@
# Common code for the frontend interface of map tools
import sys
import os
import pathlib
import shutil
from collections import namedtuple
Settings = namedtuple('Settings', ['map_folder', 'tgm'])
MapsToRun = namedtuple('MapsToRun', ['files', 'indices'])
def string_to_num(s):
try:
return int(s)
except ValueError:
return -1
def read_settings():
# discover map folder if needed
try:
map_folder = os.environ['MAPROOT']
except KeyError:
map_folder = '_maps/'
for _ in range(8):
if os.path.exists(map_folder):
break
map_folder = os.path.join('..', map_folder)
else:
map_folder = None
# assume TGM is True by default
tgm = os.environ.get('TGM', "1") == "1"
return Settings(map_folder, tgm)
def pretty_path(settings, path_str):
if settings.map_folder:
return path_str[len(os.path.commonpath([settings.map_folder, path_str]))+1:]
else:
return path_str
def prompt_maps(settings, verb):
if not settings.map_folder:
print("Could not autodetect the _maps folder, set MAPROOT")
exit(1)
list_of_files = list()
for root, directories, filenames in os.walk(settings.map_folder):
for filename in [f for f in filenames if f.endswith(".dmm")]:
list_of_files.append(pathlib.Path(root, filename))
last_dir = ""
for i, this_file in enumerate(list_of_files):
this_dir = this_file.parent
if last_dir != this_dir:
print("--------------------------------")
last_dir = this_dir
print("[{}]: {}".format(i, pretty_path(settings, str(this_file))))
print("--------------------------------")
in_list = input("List the maps you want to " + verb + " (example: 1,3-5,12):\n")
in_list = in_list.replace(" ", "")
in_list = in_list.split(",")
valid_indices = list()
for m in in_list:
index_range = m.split("-")
if len(index_range) == 1:
index = string_to_num(index_range[0])
if index >= 0 and index < len(list_of_files):
valid_indices.append(index)
elif len(index_range) == 2:
index0 = string_to_num(index_range[0])
index1 = string_to_num(index_range[1])
if index0 >= 0 and index0 <= index1 and index1 < len(list_of_files):
valid_indices.extend(range(index0, index1 + 1))
return MapsToRun(list_of_files, valid_indices)
def process(settings, verb, *, modify=True, backup=None):
if backup is None:
backup = modify # by default, backup when we modify
assert modify or not backup # doesn't make sense to backup when not modifying
if len(sys.argv) > 1:
maps = sys.argv[1:]
else:
maps = prompt_maps(settings, verb)
maps = [str(maps.files[i]) for i in maps.indices]
print()
if not maps:
print("No maps selected.")
return
if modify:
print(f"Maps WILL{'' if settings.tgm else ' NOT'} be converted to tgm.")
if backup:
print("Backups will be created with a \".before\" extension.")
else:
print("Warning: backups are NOT being taken.")
print(f"\nWill {verb} these maps:")
for path_str in maps:
print(pretty_path(settings, path_str))
try:
confirm = input(f"\nPress Enter to {verb}...\n")
except KeyboardInterrupt:
confirm = "^C"
if confirm != "":
print(f"\nAborted.")
return
for path_str in maps:
print(f' - {pretty_path(settings, path_str)}')
if backup:
shutil.copyfile(path_str, path_str + ".before")
try:
yield path_str
except Exception as e:
print(f"Error: {e}")
else:
print("Succeeded.")
print("\nFinished.")

View File

@@ -0,0 +1,5 @@
@echo off
set MAPROOT=../../_maps/
set TGM=1
python mapmerge.py
pause

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
import frontend
import shutil
from dmm import *
from collections import defaultdict
def merge_map(new_map, old_map, delete_unused=False):
if new_map.key_length != old_map.key_length:
print("Warning: Key lengths differ, taking new map")
print(f" Old: {old_map.key_length}")
print(f" New: {new_map.key_length}")
return new_map
if new_map.size != old_map.size:
print("Warning: Map dimensions differ, taking new map")
print(f" Old: {old_map.size}")
print(f" New: {new_map.size}")
return new_map
key_length, size = old_map.key_length, old_map.size
merged = DMM(key_length, size)
merged.dictionary = old_map.dictionary.copy()
known_keys = dict() # mapping fron 'new' key to 'merged' key
unused_keys = set(old_map.dictionary.keys()) # keys going unused
# step one: parse the new version, compare it to the old version, merge both
for z, y, x in new_map.coords_zyx:
new_key = new_map.grid[x, y, z]
# if this key has been processed before, it can immediately be merged
try:
merged.grid[x, y, z] = known_keys[new_key]
continue
except KeyError:
pass
def select_key(assigned):
merged.grid[x, y, z] = known_keys[new_key] = assigned
old_key = old_map.grid[x, y, z]
old_tile = old_map.dictionary[old_key]
new_tile = new_map.dictionary[new_key]
# this tile is the exact same as before, so the old key is used
if new_tile == old_tile:
select_key(old_key)
unused_keys.remove(old_key)
# the tile is different here, but if it exists in the merged dictionary, that key can be used
elif new_tile in merged.dictionary.inv:
newold_key = merged.dictionary.inv[new_tile]
select_key(newold_key)
unused_keys.remove(newold_key)
# the tile is brand new and it needs a new key, but if the old key isn't being used any longer it can be used instead
elif old_tile not in new_map.dictionary.inv and old_key in unused_keys:
merged.dictionary[old_key] = new_tile
select_key(old_key)
unused_keys.remove(old_key)
# all other options ruled out, a brand new key is generated for the brand new tile
else:
fresh_key = merged.generate_new_key()
merged.dictionary[fresh_key] = new_tile
select_key(fresh_key)
# step two: delete unused keys
if unused_keys:
print(f"Notice: Trimming {len(unused_keys)} unused dictionary keys.")
for key in unused_keys:
del merged.dictionary[key]
# sanity check: that the merged map equals the new map
for z, y, x in new_map.coords_zyx:
new_tile = new_map.dictionary[new_map.grid[x, y, z]]
merged_tile = merged.dictionary[merged.grid[x, y, z]]
if new_tile != merged_tile:
print(f"Error: the map has been mangled! This is a mapmerge bug!")
print(f"At {x},{y},{z}.")
print(f"Should be {new_tile}")
print(f"Instead is {merged_tile}")
raise RuntimeError()
return merged
def main(settings):
for fname in frontend.process(settings, "merge", backup=True):
old_map = DMM.from_file(fname + ".backup")
new_map = DMM.from_file(fname)
merge_map(old_map, new_map).to_file(fname, settings.tgm)
if __name__ == '__main__':
main(frontend.read_settings())

View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python3
import os
import pygit2
import dmm
from mapmerge import merge_map
def main(repo):
if repo.index.conflicts:
print("You need to resolve merge conflicts first.")
return 1
changed = 0
for path, status in repo.status().items():
if path.endswith(".dmm") and (status & (pygit2.GIT_STATUS_INDEX_MODIFIED | pygit2.GIT_STATUS_INDEX_NEW)):
# read the index
index_entry = repo.index[path]
index_map = dmm.DMM.from_bytes(repo[index_entry.id].read_raw())
try:
head_blob = repo[repo[repo.head.target].tree[path].id]
except KeyError:
# New map, no entry in HEAD
print(f"Converting new map: {path}")
assert (status & pygit2.GIT_STATUS_INDEX_NEW)
merged_map = index_map
else:
# Entry in HEAD, merge the index over it
print(f"Merging map: {path}")
assert not (status & pygit2.GIT_STATUS_INDEX_NEW)
head_map = dmm.DMM.from_bytes(head_blob.read_raw())
merged_map = merge_map(index_map, head_map)
# write to the index
blob_id = repo.create_blob(merged_map.to_bytes())
repo.index.add(pygit2.IndexEntry(path, blob_id, index_entry.mode))
changed += 1
# write to the working directory if that's clean
if status & (pygit2.GIT_STATUS_WT_DELETED | pygit2.GIT_STATUS_WT_MODIFIED):
print(f"Warning: {path} has unindexed changes, not overwriting them")
else:
merged_map.to_file(os.path.join(repo.workdir, path))
if changed:
repo.index.write()
print(f"Merged {changed} maps.")
return 0
if __name__ == '__main__':
exit(main(pygit2.Repository(pygit2.discover_repository(os.getcwd()))))

View File

@@ -0,0 +1,3 @@
@echo off
python -m pip install -r requirements.txt
pause

View File

@@ -0,0 +1,2 @@
pygit2==0.26.0
bidict==0.13.1

View File

@@ -0,0 +1,5 @@
@echo off
set MAPROOT=../../_maps/
set TGM=0
python convert.py
pause