diff options
author | Juan J. Martinez <jjm@usebox.net> | 2023-11-05 11:22:55 +0000 |
---|---|---|
committer | Juan J. Martinez <jjm@usebox.net> | 2023-11-05 11:31:28 +0000 |
commit | 2fbdf974338bde8576efdae40a819a76b2391033 (patch) | |
tree | 64d41a37470143f142344f9a439d96de3e7918c2 /tools/map.py | |
download | kitsunes-curse-2fbdf974338bde8576efdae40a819a76b2391033.tar.gz kitsunes-curse-2fbdf974338bde8576efdae40a819a76b2391033.zip |
Initial import of the open source release
Diffstat (limited to 'tools/map.py')
-rwxr-xr-x | tools/map.py | 460 |
1 files changed, 460 insertions, 0 deletions
diff --git a/tools/map.py b/tools/map.py new file mode 100755 index 0000000..5e51330 --- /dev/null +++ b/tools/map.py @@ -0,0 +1,460 @@ +#!/usr/bin/env python3 + +import os +import sys +from argparse import ArgumentParser +import json +import subprocess +import tempfile +from collections import defaultdict +from math import ceil +import traceback + +__version__ = "1.0" + +DEF_ROOM_WIDTH = 20 +DEF_ROOM_HEIGHT = 20 +DEF_TS_SIZE = 16 +DEF_BITS = 4 + +# entity types +# unused shouldn't be used :) +TYPES = ("unused", "fill", "link", "door", "platform", "spirit", + "flame", "vampire", "oni", "ninja", "spider", "demon", "cloud", + "torch", "switch", "gem", "key", "potion", "gtail", "start", + "blocked", "sdoor",) + +# entity weight; for cases where the number of sprites is different than 1 (e.g. no sprite, or 2) +TYPE_W = {"unused": None, "fill": None, "link": None, "sdoor": None, + "demon": 2, "ninja": 2} +# persistent types +TYPES_P = ("door", "potion", "key", "gtail", "gem",) + +NO_ID = 255 + + +def aplib_compress(data): + with tempfile.NamedTemporaryFile() as fd: + fd.write(bytearray(data)) + fd.flush() + + ap_name = fd.name + ".ap" + subprocess.call(["apultra", fd.name, ap_name], stdout=sys.stderr) + + with open(ap_name, "rb") as fd: + out = fd.read() + os.unlink(ap_name) + return [int(byte) for byte in out] + + +def zx7_compress(data): + with tempfile.NamedTemporaryFile() as fd: + fd.write(bytearray(data)) + fd.flush() + + zx7_name = fd.name + ".zx7" + subprocess.call(["zx7", "-f", fd.name], stdout=sys.stderr) + + with open(zx7_name, "rb") as fd: + out = fd.read() + os.unlink(zx7_name) + return [int(byte) for byte in out] + + +def ucl_compress(data): + p = subprocess.Popen( + ["ucl", ], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + output, err = p.communicate(bytearray(data)) + return [int(byte) for byte in output] + + +def find_name(data, name): + for item in data: + if item.get("name").lower() == name.lower(): + return item + raise ValueError("%r not found" % name) + + +def find_id(data, id): + for item in data: + if item.get("id") == id: + return item + raise ValueError("id %r not found", id) + + +def get_property(obj, name, default): + props = obj.get("properties", {}) + + if isinstance(props, dict): + return props.get(name, default) + + for p in props: + if p["name"] == name: + return p["value"] + + return default + + +def main(): + + parser = ArgumentParser(description="Map importer", + epilog="Copyright (C) 2020 Juan J Martinez <jjm@usebox.net>", + ) + + parser.add_argument( + "--version", action="version", version="%(prog)s " + __version__) + parser.add_argument( + "--room-width", dest="rw", default=DEF_ROOM_WIDTH, type=int, + help="room width (default: %s)" % DEF_ROOM_WIDTH) + parser.add_argument( + "--room-height", dest="rh", default=DEF_ROOM_HEIGHT, type=int, + help="room height (default: %s)" % DEF_ROOM_HEIGHT) + parser.add_argument("--ts-size", dest="tsz", default=DEF_TS_SIZE, type=int, + help="tileset size (default: %s)" % DEF_TS_SIZE) + parser.add_argument("--max-ents", dest="max_ents", default=0, type=int, + help="max entities per room (default: unlimited)") + parser.add_argument("-b", dest="bin", action="store_true", + help="output binary data (default: C code)") + parser.add_argument("--ucl", dest="ucl", action="store_true", + help="UCL compressed") + parser.add_argument("--zx7", dest="zx7", action="store_true", + help="ZX7 compressed") + parser.add_argument("--aplib", dest="aplib", action="store_true", + help="APLIB compressed") + parser.add_argument("-q", dest="quiet", action="store_true", + help="Don't output stats on stderr") + parser.add_argument("map_json", help="Map to import") + parser.add_argument("id", help="variable name") + + args = parser.parse_args() + + if [args.ucl, args.zx7, args.aplib].count(True) > 1: + parser.error( + "Can't use more than one compression type at the same time") + + if args.ucl: + compress = ucl_compress + compressor = "UCL" + elif args.zx7: + compress = zx7_compress + compressor = "ZX7" + elif args.aplib: + compress = aplib_compress + compressor = "APLIB" + else: + compress = None + compressor = "none" + + with open(args.map_json, "rt") as fd: + data = json.load(fd) + + mh = data.get("height", 0) + mw = data.get("width", 0) + + if mh < args.rh or mh % args.rh: + parser.error("Map size height not multiple of the room size") + if mw < args.rw or mw % args.rw: + parser.error("Map size witdh not multiple of the room size") + + tilewidth = data["tilewidth"] + tileheight = data["tileheight"] + + tile_layer = find_name(data["layers"], "Map")["data"] + + def_tileset = find_name(data["tilesets"], "default") + firstgid = def_tileset.get("firstgid") + + tilesets = [] + out = [] + for y in range(0, mh, args.rh): + for x in range(0, mw, args.rw): + block = [] + ts = None + for j in range(args.rh): + for i in range(args.rw): + block.append(tile_layer[x + i + (y + j) * mw] - firstgid) + + # pack + current = [] + for i in range(0, args.rh * args.rw, 8 // DEF_BITS): + tiles = [] + for k in range(8 // DEF_BITS): + tiles.append(block[i + k]) + + # store the tileset + if ts is None: + ts = tiles[0] // args.tsz + if ts > 15: + parser.error("Too many tilesets (%d)" % (ts + 1)) + + # don't allow mixed tilesets + for k in range(8 // DEF_BITS): + if tiles[k] // args.tsz != ts: + parser.error("Mixed tilesets in map %d" % len(out)) + + # correct the tile as it was 1st tileset + if tiles[0] > args.tsz - 1: + for k in range(8 // DEF_BITS): + tiles[k] %= args.tsz + + b = 0 + pos = 8 + for k in range(8 // DEF_BITS): + pos -= DEF_BITS + b |= (tiles[k] & ((2**DEF_BITS) - 1)) << pos + + current.append(b) + + tilesets.append(ts) + out.append(current) + + # track empty maps + empty = [] + for i, block in enumerate(out): + if all([byte == 0xff for byte in block]): + empty.append(i) + + if compress: + compressed = [] + for i, block in enumerate(out): + if i in empty: + compressed.append(None) + continue + compressed.append(compress(block)) + out = compressed + + # add the map header + for i in range(len(out)): + if out[i] is None: + continue + size = len(out[i]) + if size > 255: + # unlikely + parser.error("map %i size is larger than 255" % i) + + out[i] = [size, tilesets[i], ] + out[i] + + entities_layer = find_name(data["layers"], "Entities") + if len(entities_layer): + start = None + map_ents = defaultdict(list) + map_ents_w = defaultdict(int) + pid_cnt = 0 + + gems = 0 + + try: + objs = sorted( + entities_layer["objects"], key=lambda o: TYPES.index(o["name"].lower()), reverse=True) + except ValueError: + parser.error("map has an unnamed object") + for obj in objs: + name = obj["name"].lower() + m = ((obj["x"] // tilewidth) // args.rw) \ + + (((obj["y"] // tileheight) // args.rh) * (mw // args.rw)) + x = obj["x"] % (args.rw * tilewidth) + y = obj["y"] % (args.rh * tileheight) + + if name == "start": + start = (m, x, y) + continue + + if name == "gem": + gems += 1 + + if name == "blocked": + if obj["width"] > obj["height"]: + if y == 0: + # up blocked + out[m][1] |= (1 << 4) + else: + # down blocked + out[m][1] |= (1 << 5) + else: + if x == 0: + # left blocked + out[m][1] |= (1 << 6) + else: + # tight blocked + out[m][1] |= (1 << 7) + continue + + t = TYPES.index(name) + + if name in TYPES_P: + pid = pid_cnt + pid_cnt += 1 + else: + pid = NO_ID + + # MSB is direction + + param = int(get_property(obj, "param", 0)) + if param == 1: + t |= 128 + + if args.max_ents: + # update the entity count per map + try: + if TYPE_W[name] is not None: + map_ents_w[m] += TYPE_W[name] + except KeyError: + # no entry, assume 1 + map_ents_w[m] += 1 + + special = None + + # specials + if name == "fill": + # tile to fill with + pid = get_property(obj, "tile", 0) + # fill length + special = obj["width"] // tilewidth + + if get_property(obj, "fixed", None) is not None: + if obj["width"] >= obj["height"]: + special = obj["width"] - tilewidth + if not param: + x += special + else: + special = obj["height"] - tileheight + if not param: + y += special + + if name == "link": + target_id = get_property(obj, "target", None) + if target_id is None: + parser.error("link object (%s) with no target" % obj["id"]) + + target = find_id(objs, target_id) + back = get_property(target, "target", None) + if back != obj["id"]: + parser.error( + "link object (%s) link back is missing" % obj["id"]) + + # if not in first line, must be down + if y != 0: + t |= 128 + else: + t &= 127 + + # pid will target the map + pid = ((target["x"] // tilewidth) // args.rw) \ + + (((target["y"] // tileheight) // args.rh) + * (mw // args.rw)) + + target_x = target["x"] % (args.rw * tilewidth) + + # only up/down supported + # x will have any offset, y unused + y = 0 + x = (target_x - x) & 0xff + + if name == "switch": + target_id = get_property(obj, "target", None) + if target_id is None: + parser.error("link object (%s) with no target" % obj["id"]) + + target = find_id(objs, target_id) + + # not persistent + pid = 0xff + target_x = target["x"] % (args.rw * tilewidth) + target_y = target["y"] % (args.rh * tileheight) + special = (target_x, target_y) + + # managed by the switch + if name == "sdoor": + continue + + map_ents[m].extend([t, pid, x, y]) + if special is not None: + if isinstance(special, (tuple, list)): + map_ents[m].extend(special) + else: + map_ents[m].append(special) + + if args.max_ents: + for i, weight in map_ents_w.items(): + if weight > args.max_ents: + parser.error("map %i has %d entities, max is %d" % + (i, weight, args.max_ents)) + + # append the entities to the map data + for i in range(len(out)): + if not out[i]: + continue + elif map_ents[i]: + out[i].extend(map_ents[i]) + # terminator + out[i].append(0xff) + + if start is None: + parser.error("No start found") + + if args.bin: + # untested; to be used loading from disc + sys.stdout.write(bytearray(out)) + return + + print("#ifndef _%s_H" % args.id.upper()) + print("#define _%s_H" % args.id.upper()) + print("/* compressed: %s */" % compressor) + print("#define PERSISTENCE_LEN %d\n" % max(ceil(pid_cnt / 8.), 1)) + print("#define WMAPS %d\n" % (mw // args.rw)) + print("#define MAPS %d\n" % len(out)) + + if start is not None: + m, x, y = start + print("#define START_MAP %d" % m) + print("#define START_X %d" % x) + print("#define START_Y %d" % y) + + print("#ifdef LOCAL") + + # includes a map table for fast access + data_out = "" + for i, block in enumerate(out): + if not isinstance(block, list): + continue + data_out_part = "" + for part in range(0, len(block), args.rw // 2): + if data_out_part: + data_out_part += ",\n" + data_out_part += ', '.join( + ["0x%02x" % byte for byte in block[part: part + args.rw // 2]]) + data_out += "const unsigned char %s_%d[%d] = {\n" % ( + args.id, i, len(block)) + data_out += data_out_part + "\n};\n" + + data_out += "const unsigned char * const %s[%d] = { " % (args.id, len(out)) + data_out += ', '.join( + ["%s_%d" % (args.id, + i) if i not in empty else "(unsigned char *)0" for i in range(len(out))]) + data_out += " };\n" + print(data_out) + print("unsigned char persistence[PERSISTENCE_LEN];\n") + + print("#else") + print("extern const unsigned char * const %s[%d];\n" % (args.id, len(out))) + print("extern unsigned char persistence[PERSISTENCE_LEN];\n") + + print("#endif // LOCAL") + print("#endif // _%s_H" % args.id.upper()) + + if not args.quiet: + screen_with_data = len(out) - len(empty) + total_bytes = sum(len(b) if b else 0 for b in out) + print("%s: %s (%d screens, %d bytes, %.2f bytes avg) - %d gems" % ( + os.path.basename(sys.argv[0]), args.id, + screen_with_data, total_bytes, total_bytes / screen_with_data, + gems), file=sys.stderr) + + +if __name__ == "__main__": + try: + main() + except Exception as ex: + print("FATAL: %s\n***" % ex, file=sys.stderr) + traceback.print_exc(ex) + sys.exit(1) |