aboutsummaryrefslogtreecommitdiff
path: root/tools/map.py
diff options
context:
space:
mode:
authorJuan J. Martinez <jjm@usebox.net>2023-11-05 11:22:55 +0000
committerJuan J. Martinez <jjm@usebox.net>2023-11-05 11:31:28 +0000
commit2fbdf974338bde8576efdae40a819a76b2391033 (patch)
tree64d41a37470143f142344f9a439d96de3e7918c2 /tools/map.py
downloadkitsunes-curse-2fbdf974338bde8576efdae40a819a76b2391033.tar.gz
kitsunes-curse-2fbdf974338bde8576efdae40a819a76b2391033.zip
Initial import of the open source release
Diffstat (limited to 'tools/map.py')
-rwxr-xr-xtools/map.py460
1 files changed, 460 insertions, 0 deletions
diff --git a/tools/map.py b/tools/map.py
new file mode 100755
index 0000000..5e51330
--- /dev/null
+++ b/tools/map.py
@@ -0,0 +1,460 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+from argparse import ArgumentParser
+import json
+import subprocess
+import tempfile
+from collections import defaultdict
+from math import ceil
+import traceback
+
+__version__ = "1.0"
+
+DEF_ROOM_WIDTH = 20
+DEF_ROOM_HEIGHT = 20
+DEF_TS_SIZE = 16
+DEF_BITS = 4
+
+# entity types
+# unused shouldn't be used :)
+TYPES = ("unused", "fill", "link", "door", "platform", "spirit",
+ "flame", "vampire", "oni", "ninja", "spider", "demon", "cloud",
+ "torch", "switch", "gem", "key", "potion", "gtail", "start",
+ "blocked", "sdoor",)
+
+# entity weight; for cases where the number of sprites is different than 1 (e.g. no sprite, or 2)
+TYPE_W = {"unused": None, "fill": None, "link": None, "sdoor": None,
+ "demon": 2, "ninja": 2}
+# persistent types
+TYPES_P = ("door", "potion", "key", "gtail", "gem",)
+
+NO_ID = 255
+
+
+def aplib_compress(data):
+ with tempfile.NamedTemporaryFile() as fd:
+ fd.write(bytearray(data))
+ fd.flush()
+
+ ap_name = fd.name + ".ap"
+ subprocess.call(["apultra", fd.name, ap_name], stdout=sys.stderr)
+
+ with open(ap_name, "rb") as fd:
+ out = fd.read()
+ os.unlink(ap_name)
+ return [int(byte) for byte in out]
+
+
+def zx7_compress(data):
+ with tempfile.NamedTemporaryFile() as fd:
+ fd.write(bytearray(data))
+ fd.flush()
+
+ zx7_name = fd.name + ".zx7"
+ subprocess.call(["zx7", "-f", fd.name], stdout=sys.stderr)
+
+ with open(zx7_name, "rb") as fd:
+ out = fd.read()
+ os.unlink(zx7_name)
+ return [int(byte) for byte in out]
+
+
+def ucl_compress(data):
+ p = subprocess.Popen(
+ ["ucl", ], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ output, err = p.communicate(bytearray(data))
+ return [int(byte) for byte in output]
+
+
+def find_name(data, name):
+ for item in data:
+ if item.get("name").lower() == name.lower():
+ return item
+ raise ValueError("%r not found" % name)
+
+
+def find_id(data, id):
+ for item in data:
+ if item.get("id") == id:
+ return item
+ raise ValueError("id %r not found", id)
+
+
+def get_property(obj, name, default):
+ props = obj.get("properties", {})
+
+ if isinstance(props, dict):
+ return props.get(name, default)
+
+ for p in props:
+ if p["name"] == name:
+ return p["value"]
+
+ return default
+
+
+def main():
+
+ parser = ArgumentParser(description="Map importer",
+ epilog="Copyright (C) 2020 Juan J Martinez <jjm@usebox.net>",
+ )
+
+ parser.add_argument(
+ "--version", action="version", version="%(prog)s " + __version__)
+ parser.add_argument(
+ "--room-width", dest="rw", default=DEF_ROOM_WIDTH, type=int,
+ help="room width (default: %s)" % DEF_ROOM_WIDTH)
+ parser.add_argument(
+ "--room-height", dest="rh", default=DEF_ROOM_HEIGHT, type=int,
+ help="room height (default: %s)" % DEF_ROOM_HEIGHT)
+ parser.add_argument("--ts-size", dest="tsz", default=DEF_TS_SIZE, type=int,
+ help="tileset size (default: %s)" % DEF_TS_SIZE)
+ parser.add_argument("--max-ents", dest="max_ents", default=0, type=int,
+ help="max entities per room (default: unlimited)")
+ parser.add_argument("-b", dest="bin", action="store_true",
+ help="output binary data (default: C code)")
+ parser.add_argument("--ucl", dest="ucl", action="store_true",
+ help="UCL compressed")
+ parser.add_argument("--zx7", dest="zx7", action="store_true",
+ help="ZX7 compressed")
+ parser.add_argument("--aplib", dest="aplib", action="store_true",
+ help="APLIB compressed")
+ parser.add_argument("-q", dest="quiet", action="store_true",
+ help="Don't output stats on stderr")
+ parser.add_argument("map_json", help="Map to import")
+ parser.add_argument("id", help="variable name")
+
+ args = parser.parse_args()
+
+ if [args.ucl, args.zx7, args.aplib].count(True) > 1:
+ parser.error(
+ "Can't use more than one compression type at the same time")
+
+ if args.ucl:
+ compress = ucl_compress
+ compressor = "UCL"
+ elif args.zx7:
+ compress = zx7_compress
+ compressor = "ZX7"
+ elif args.aplib:
+ compress = aplib_compress
+ compressor = "APLIB"
+ else:
+ compress = None
+ compressor = "none"
+
+ with open(args.map_json, "rt") as fd:
+ data = json.load(fd)
+
+ mh = data.get("height", 0)
+ mw = data.get("width", 0)
+
+ if mh < args.rh or mh % args.rh:
+ parser.error("Map size height not multiple of the room size")
+ if mw < args.rw or mw % args.rw:
+ parser.error("Map size witdh not multiple of the room size")
+
+ tilewidth = data["tilewidth"]
+ tileheight = data["tileheight"]
+
+ tile_layer = find_name(data["layers"], "Map")["data"]
+
+ def_tileset = find_name(data["tilesets"], "default")
+ firstgid = def_tileset.get("firstgid")
+
+ tilesets = []
+ out = []
+ for y in range(0, mh, args.rh):
+ for x in range(0, mw, args.rw):
+ block = []
+ ts = None
+ for j in range(args.rh):
+ for i in range(args.rw):
+ block.append(tile_layer[x + i + (y + j) * mw] - firstgid)
+
+ # pack
+ current = []
+ for i in range(0, args.rh * args.rw, 8 // DEF_BITS):
+ tiles = []
+ for k in range(8 // DEF_BITS):
+ tiles.append(block[i + k])
+
+ # store the tileset
+ if ts is None:
+ ts = tiles[0] // args.tsz
+ if ts > 15:
+ parser.error("Too many tilesets (%d)" % (ts + 1))
+
+ # don't allow mixed tilesets
+ for k in range(8 // DEF_BITS):
+ if tiles[k] // args.tsz != ts:
+ parser.error("Mixed tilesets in map %d" % len(out))
+
+ # correct the tile as it was 1st tileset
+ if tiles[0] > args.tsz - 1:
+ for k in range(8 // DEF_BITS):
+ tiles[k] %= args.tsz
+
+ b = 0
+ pos = 8
+ for k in range(8 // DEF_BITS):
+ pos -= DEF_BITS
+ b |= (tiles[k] & ((2**DEF_BITS) - 1)) << pos
+
+ current.append(b)
+
+ tilesets.append(ts)
+ out.append(current)
+
+ # track empty maps
+ empty = []
+ for i, block in enumerate(out):
+ if all([byte == 0xff for byte in block]):
+ empty.append(i)
+
+ if compress:
+ compressed = []
+ for i, block in enumerate(out):
+ if i in empty:
+ compressed.append(None)
+ continue
+ compressed.append(compress(block))
+ out = compressed
+
+ # add the map header
+ for i in range(len(out)):
+ if out[i] is None:
+ continue
+ size = len(out[i])
+ if size > 255:
+ # unlikely
+ parser.error("map %i size is larger than 255" % i)
+
+ out[i] = [size, tilesets[i], ] + out[i]
+
+ entities_layer = find_name(data["layers"], "Entities")
+ if len(entities_layer):
+ start = None
+ map_ents = defaultdict(list)
+ map_ents_w = defaultdict(int)
+ pid_cnt = 0
+
+ gems = 0
+
+ try:
+ objs = sorted(
+ entities_layer["objects"], key=lambda o: TYPES.index(o["name"].lower()), reverse=True)
+ except ValueError:
+ parser.error("map has an unnamed object")
+ for obj in objs:
+ name = obj["name"].lower()
+ m = ((obj["x"] // tilewidth) // args.rw) \
+ + (((obj["y"] // tileheight) // args.rh) * (mw // args.rw))
+ x = obj["x"] % (args.rw * tilewidth)
+ y = obj["y"] % (args.rh * tileheight)
+
+ if name == "start":
+ start = (m, x, y)
+ continue
+
+ if name == "gem":
+ gems += 1
+
+ if name == "blocked":
+ if obj["width"] > obj["height"]:
+ if y == 0:
+ # up blocked
+ out[m][1] |= (1 << 4)
+ else:
+ # down blocked
+ out[m][1] |= (1 << 5)
+ else:
+ if x == 0:
+ # left blocked
+ out[m][1] |= (1 << 6)
+ else:
+ # tight blocked
+ out[m][1] |= (1 << 7)
+ continue
+
+ t = TYPES.index(name)
+
+ if name in TYPES_P:
+ pid = pid_cnt
+ pid_cnt += 1
+ else:
+ pid = NO_ID
+
+ # MSB is direction
+
+ param = int(get_property(obj, "param", 0))
+ if param == 1:
+ t |= 128
+
+ if args.max_ents:
+ # update the entity count per map
+ try:
+ if TYPE_W[name] is not None:
+ map_ents_w[m] += TYPE_W[name]
+ except KeyError:
+ # no entry, assume 1
+ map_ents_w[m] += 1
+
+ special = None
+
+ # specials
+ if name == "fill":
+ # tile to fill with
+ pid = get_property(obj, "tile", 0)
+ # fill length
+ special = obj["width"] // tilewidth
+
+ if get_property(obj, "fixed", None) is not None:
+ if obj["width"] >= obj["height"]:
+ special = obj["width"] - tilewidth
+ if not param:
+ x += special
+ else:
+ special = obj["height"] - tileheight
+ if not param:
+ y += special
+
+ if name == "link":
+ target_id = get_property(obj, "target", None)
+ if target_id is None:
+ parser.error("link object (%s) with no target" % obj["id"])
+
+ target = find_id(objs, target_id)
+ back = get_property(target, "target", None)
+ if back != obj["id"]:
+ parser.error(
+ "link object (%s) link back is missing" % obj["id"])
+
+ # if not in first line, must be down
+ if y != 0:
+ t |= 128
+ else:
+ t &= 127
+
+ # pid will target the map
+ pid = ((target["x"] // tilewidth) // args.rw) \
+ + (((target["y"] // tileheight) // args.rh)
+ * (mw // args.rw))
+
+ target_x = target["x"] % (args.rw * tilewidth)
+
+ # only up/down supported
+ # x will have any offset, y unused
+ y = 0
+ x = (target_x - x) & 0xff
+
+ if name == "switch":
+ target_id = get_property(obj, "target", None)
+ if target_id is None:
+ parser.error("link object (%s) with no target" % obj["id"])
+
+ target = find_id(objs, target_id)
+
+ # not persistent
+ pid = 0xff
+ target_x = target["x"] % (args.rw * tilewidth)
+ target_y = target["y"] % (args.rh * tileheight)
+ special = (target_x, target_y)
+
+ # managed by the switch
+ if name == "sdoor":
+ continue
+
+ map_ents[m].extend([t, pid, x, y])
+ if special is not None:
+ if isinstance(special, (tuple, list)):
+ map_ents[m].extend(special)
+ else:
+ map_ents[m].append(special)
+
+ if args.max_ents:
+ for i, weight in map_ents_w.items():
+ if weight > args.max_ents:
+ parser.error("map %i has %d entities, max is %d" %
+ (i, weight, args.max_ents))
+
+ # append the entities to the map data
+ for i in range(len(out)):
+ if not out[i]:
+ continue
+ elif map_ents[i]:
+ out[i].extend(map_ents[i])
+ # terminator
+ out[i].append(0xff)
+
+ if start is None:
+ parser.error("No start found")
+
+ if args.bin:
+ # untested; to be used loading from disc
+ sys.stdout.write(bytearray(out))
+ return
+
+ print("#ifndef _%s_H" % args.id.upper())
+ print("#define _%s_H" % args.id.upper())
+ print("/* compressed: %s */" % compressor)
+ print("#define PERSISTENCE_LEN %d\n" % max(ceil(pid_cnt / 8.), 1))
+ print("#define WMAPS %d\n" % (mw // args.rw))
+ print("#define MAPS %d\n" % len(out))
+
+ if start is not None:
+ m, x, y = start
+ print("#define START_MAP %d" % m)
+ print("#define START_X %d" % x)
+ print("#define START_Y %d" % y)
+
+ print("#ifdef LOCAL")
+
+ # includes a map table for fast access
+ data_out = ""
+ for i, block in enumerate(out):
+ if not isinstance(block, list):
+ continue
+ data_out_part = ""
+ for part in range(0, len(block), args.rw // 2):
+ if data_out_part:
+ data_out_part += ",\n"
+ data_out_part += ', '.join(
+ ["0x%02x" % byte for byte in block[part: part + args.rw // 2]])
+ data_out += "const unsigned char %s_%d[%d] = {\n" % (
+ args.id, i, len(block))
+ data_out += data_out_part + "\n};\n"
+
+ data_out += "const unsigned char * const %s[%d] = { " % (args.id, len(out))
+ data_out += ', '.join(
+ ["%s_%d" % (args.id,
+ i) if i not in empty else "(unsigned char *)0" for i in range(len(out))])
+ data_out += " };\n"
+ print(data_out)
+ print("unsigned char persistence[PERSISTENCE_LEN];\n")
+
+ print("#else")
+ print("extern const unsigned char * const %s[%d];\n" % (args.id, len(out)))
+ print("extern unsigned char persistence[PERSISTENCE_LEN];\n")
+
+ print("#endif // LOCAL")
+ print("#endif // _%s_H" % args.id.upper())
+
+ if not args.quiet:
+ screen_with_data = len(out) - len(empty)
+ total_bytes = sum(len(b) if b else 0 for b in out)
+ print("%s: %s (%d screens, %d bytes, %.2f bytes avg) - %d gems" % (
+ os.path.basename(sys.argv[0]), args.id,
+ screen_with_data, total_bytes, total_bytes / screen_with_data,
+ gems), file=sys.stderr)
+
+
+if __name__ == "__main__":
+ try:
+ main()
+ except Exception as ex:
+ print("FATAL: %s\n***" % ex, file=sys.stderr)
+ traceback.print_exc(ex)
+ sys.exit(1)