#!/usr/bin/env python3 import os import sys from argparse import ArgumentParser import json import subprocess import tempfile from collections import defaultdict from math import ceil import traceback __version__ = "1.0" DEF_ROOM_WIDTH = 20 DEF_ROOM_HEIGHT = 20 DEF_TS_SIZE = 16 DEF_BITS = 4 # entity types # unused shouldn't be used :) TYPES = ("unused", "fill", "link", "door", "platform", "spirit", "flame", "vampire", "oni", "ninja", "spider", "demon", "cloud", "torch", "switch", "gem", "key", "potion", "gtail", "start", "blocked", "sdoor",) # entity weight; for cases where the number of sprites is different than 1 (e.g. no sprite, or 2) TYPE_W = {"unused": None, "fill": None, "link": None, "sdoor": None, "demon": 2, "ninja": 2} # persistent types TYPES_P = ("door", "potion", "key", "gtail", "gem",) NO_ID = 255 def aplib_compress(data): with tempfile.NamedTemporaryFile() as fd: fd.write(bytearray(data)) fd.flush() ap_name = fd.name + ".ap" subprocess.call(["apultra", fd.name, ap_name], stdout=sys.stderr) with open(ap_name, "rb") as fd: out = fd.read() os.unlink(ap_name) return [int(byte) for byte in out] def zx7_compress(data): with tempfile.NamedTemporaryFile() as fd: fd.write(bytearray(data)) fd.flush() zx7_name = fd.name + ".zx7" subprocess.call(["zx7", "-f", fd.name], stdout=sys.stderr) with open(zx7_name, "rb") as fd: out = fd.read() os.unlink(zx7_name) return [int(byte) for byte in out] def ucl_compress(data): p = subprocess.Popen( ["ucl", ], stdin=subprocess.PIPE, stdout=subprocess.PIPE) output, err = p.communicate(bytearray(data)) return [int(byte) for byte in output] def find_name(data, name): for item in data: if item.get("name").lower() == name.lower(): return item raise ValueError("%r not found" % name) def find_id(data, id): for item in data: if item.get("id") == id: return item raise ValueError("id %r not found", id) def get_property(obj, name, default): props = obj.get("properties", {}) if isinstance(props, dict): return props.get(name, default) for p in props: if p["name"] == name: return p["value"] return default def main(): parser = ArgumentParser(description="Map importer", epilog="Copyright (C) 2020 Juan J Martinez ", ) parser.add_argument( "--version", action="version", version="%(prog)s " + __version__) parser.add_argument( "--room-width", dest="rw", default=DEF_ROOM_WIDTH, type=int, help="room width (default: %s)" % DEF_ROOM_WIDTH) parser.add_argument( "--room-height", dest="rh", default=DEF_ROOM_HEIGHT, type=int, help="room height (default: %s)" % DEF_ROOM_HEIGHT) parser.add_argument("--ts-size", dest="tsz", default=DEF_TS_SIZE, type=int, help="tileset size (default: %s)" % DEF_TS_SIZE) parser.add_argument("--max-ents", dest="max_ents", default=0, type=int, help="max entities per room (default: unlimited)") parser.add_argument("-b", dest="bin", action="store_true", help="output binary data (default: C code)") parser.add_argument("--ucl", dest="ucl", action="store_true", help="UCL compressed") parser.add_argument("--zx7", dest="zx7", action="store_true", help="ZX7 compressed") parser.add_argument("--aplib", dest="aplib", action="store_true", help="APLIB compressed") parser.add_argument("-q", dest="quiet", action="store_true", help="Don't output stats on stderr") parser.add_argument("map_json", help="Map to import") parser.add_argument("id", help="variable name") args = parser.parse_args() if [args.ucl, args.zx7, args.aplib].count(True) > 1: parser.error( "Can't use more than one compression type at the same time") if args.ucl: compress = ucl_compress compressor = "UCL" elif args.zx7: compress = zx7_compress compressor = "ZX7" elif args.aplib: compress = aplib_compress compressor = "APLIB" else: compress = None compressor = "none" with open(args.map_json, "rt") as fd: data = json.load(fd) mh = data.get("height", 0) mw = data.get("width", 0) if mh < args.rh or mh % args.rh: parser.error("Map size height not multiple of the room size") if mw < args.rw or mw % args.rw: parser.error("Map size witdh not multiple of the room size") tilewidth = data["tilewidth"] tileheight = data["tileheight"] tile_layer = find_name(data["layers"], "Map")["data"] def_tileset = find_name(data["tilesets"], "default") firstgid = def_tileset.get("firstgid") tilesets = [] out = [] for y in range(0, mh, args.rh): for x in range(0, mw, args.rw): block = [] ts = None for j in range(args.rh): for i in range(args.rw): block.append(tile_layer[x + i + (y + j) * mw] - firstgid) # pack current = [] for i in range(0, args.rh * args.rw, 8 // DEF_BITS): tiles = [] for k in range(8 // DEF_BITS): tiles.append(block[i + k]) # store the tileset if ts is None: ts = tiles[0] // args.tsz if ts > 15: parser.error("Too many tilesets (%d)" % (ts + 1)) # don't allow mixed tilesets for k in range(8 // DEF_BITS): if tiles[k] // args.tsz != ts: parser.error("Mixed tilesets in map %d" % len(out)) # correct the tile as it was 1st tileset if tiles[0] > args.tsz - 1: for k in range(8 // DEF_BITS): tiles[k] %= args.tsz b = 0 pos = 8 for k in range(8 // DEF_BITS): pos -= DEF_BITS b |= (tiles[k] & ((2**DEF_BITS) - 1)) << pos current.append(b) tilesets.append(ts) out.append(current) # track empty maps empty = [] for i, block in enumerate(out): if all([byte == 0xff for byte in block]): empty.append(i) if compress: compressed = [] for i, block in enumerate(out): if i in empty: compressed.append(None) continue compressed.append(compress(block)) out = compressed # add the map header for i in range(len(out)): if out[i] is None: continue size = len(out[i]) if size > 255: # unlikely parser.error("map %i size is larger than 255" % i) out[i] = [size, tilesets[i], ] + out[i] entities_layer = find_name(data["layers"], "Entities") if len(entities_layer): start = None map_ents = defaultdict(list) map_ents_w = defaultdict(int) pid_cnt = 0 gems = 0 try: objs = sorted( entities_layer["objects"], key=lambda o: TYPES.index(o["name"].lower()), reverse=True) except ValueError: parser.error("map has an unnamed object") for obj in objs: name = obj["name"].lower() m = ((obj["x"] // tilewidth) // args.rw) \ + (((obj["y"] // tileheight) // args.rh) * (mw // args.rw)) x = obj["x"] % (args.rw * tilewidth) y = obj["y"] % (args.rh * tileheight) if name == "start": start = (m, x, y) continue if name == "gem": gems += 1 if name == "blocked": if obj["width"] > obj["height"]: if y == 0: # up blocked out[m][1] |= (1 << 4) else: # down blocked out[m][1] |= (1 << 5) else: if x == 0: # left blocked out[m][1] |= (1 << 6) else: # tight blocked out[m][1] |= (1 << 7) continue t = TYPES.index(name) if name in TYPES_P: pid = pid_cnt pid_cnt += 1 else: pid = NO_ID # MSB is direction param = int(get_property(obj, "param", 0)) if param == 1: t |= 128 if args.max_ents: # update the entity count per map try: if TYPE_W[name] is not None: map_ents_w[m] += TYPE_W[name] except KeyError: # no entry, assume 1 map_ents_w[m] += 1 special = None # specials if name == "fill": # tile to fill with pid = get_property(obj, "tile", 0) # fill length special = obj["width"] // tilewidth if get_property(obj, "fixed", None) is not None: if obj["width"] >= obj["height"]: special = obj["width"] - tilewidth if not param: x += special else: special = obj["height"] - tileheight if not param: y += special if name == "link": target_id = get_property(obj, "target", None) if target_id is None: parser.error("link object (%s) with no target" % obj["id"]) target = find_id(objs, target_id) back = get_property(target, "target", None) if back != obj["id"]: parser.error( "link object (%s) link back is missing" % obj["id"]) # if not in first line, must be down if y != 0: t |= 128 else: t &= 127 # pid will target the map pid = ((target["x"] // tilewidth) // args.rw) \ + (((target["y"] // tileheight) // args.rh) * (mw // args.rw)) target_x = target["x"] % (args.rw * tilewidth) # only up/down supported # x will have any offset, y unused y = 0 x = (target_x - x) & 0xff if name == "switch": target_id = get_property(obj, "target", None) if target_id is None: parser.error("link object (%s) with no target" % obj["id"]) target = find_id(objs, target_id) # not persistent pid = 0xff target_x = target["x"] % (args.rw * tilewidth) target_y = target["y"] % (args.rh * tileheight) special = (target_x, target_y) # managed by the switch if name == "sdoor": continue map_ents[m].extend([t, pid, x, y]) if special is not None: if isinstance(special, (tuple, list)): map_ents[m].extend(special) else: map_ents[m].append(special) if args.max_ents: for i, weight in map_ents_w.items(): if weight > args.max_ents: parser.error("map %i has %d entities, max is %d" % (i, weight, args.max_ents)) # append the entities to the map data for i in range(len(out)): if not out[i]: continue elif map_ents[i]: out[i].extend(map_ents[i]) # terminator out[i].append(0xff) if start is None: parser.error("No start found") if args.bin: # untested; to be used loading from disc sys.stdout.write(bytearray(out)) return print("#ifndef _%s_H" % args.id.upper()) print("#define _%s_H" % args.id.upper()) print("/* compressed: %s */" % compressor) print("#define PERSISTENCE_LEN %d\n" % max(ceil(pid_cnt / 8.), 1)) print("#define WMAPS %d\n" % (mw // args.rw)) print("#define MAPS %d\n" % len(out)) if start is not None: m, x, y = start print("#define START_MAP %d" % m) print("#define START_X %d" % x) print("#define START_Y %d" % y) print("#ifdef LOCAL") # includes a map table for fast access data_out = "" for i, block in enumerate(out): if not isinstance(block, list): continue data_out_part = "" for part in range(0, len(block), args.rw // 2): if data_out_part: data_out_part += ",\n" data_out_part += ', '.join( ["0x%02x" % byte for byte in block[part: part + args.rw // 2]]) data_out += "const unsigned char %s_%d[%d] = {\n" % ( args.id, i, len(block)) data_out += data_out_part + "\n};\n" data_out += "const unsigned char * const %s[%d] = { " % (args.id, len(out)) data_out += ', '.join( ["%s_%d" % (args.id, i) if i not in empty else "(unsigned char *)0" for i in range(len(out))]) data_out += " };\n" print(data_out) print("unsigned char persistence[PERSISTENCE_LEN];\n") print("#else") print("extern const unsigned char * const %s[%d];\n" % (args.id, len(out))) print("extern unsigned char persistence[PERSISTENCE_LEN];\n") print("#endif // LOCAL") print("#endif // _%s_H" % args.id.upper()) if not args.quiet: screen_with_data = len(out) - len(empty) total_bytes = sum(len(b) if b else 0 for b in out) print("%s: %s (%d screens, %d bytes, %.2f bytes avg) - %d gems" % ( os.path.basename(sys.argv[0]), args.id, screen_with_data, total_bytes, total_bytes / screen_with_data, gems), file=sys.stderr) if __name__ == "__main__": try: main() except Exception as ex: print("FATAL: %s\n***" % ex, file=sys.stderr) traceback.print_exc(ex) sys.exit(1)