154 lines
4.4 KiB
Python
154 lines
4.4 KiB
Python
lib = "libduktape.207.20700.so"
|
|
|
|
from ctypes import CDLL, byref, string_at, c_int, c_void_p, c_char_p, c_size_t
|
|
from json import loads
|
|
from io import StringIO
|
|
from pandas import read_csv
|
|
|
|
duk = CDLL(lib)
|
|
|
|
def str_to_c(s):
|
|
b = s.encode("utf8")
|
|
return [c_char_p(b), len(b)]
|
|
|
|
def duk_create_heap_default():
|
|
duk.duk_create_heap.restype = c_void_p
|
|
return duk.duk_create_heap(None, None, None, None, None)
|
|
|
|
def duk_eval_string_noresult(ctx, cmd):
|
|
[s, l] = str_to_c(cmd)
|
|
return duk.duk_eval_raw(ctx, s, l, 1 | (1<<3) | (1<<9) | (1<<10) | (1<<8) | (1<<11) )
|
|
|
|
def duk_eval_string(ctx, cmd):
|
|
[s, l] = str_to_c(cmd)
|
|
return duk.duk_eval_raw(ctx, s, l, 0 | (1<<3) | (1<<9) | (1<<10) | (1<<11) )
|
|
|
|
def duk_peval(ctx):
|
|
return duk.duk_eval_raw(ctx, None, 0, 1 | (1<<3) | (1<<7) | (1<<11) )
|
|
|
|
def duk_get_string(ctx, idx):
|
|
duk.duk_get_string.restype = c_char_p
|
|
retval = duk.duk_get_string(ctx, idx)
|
|
return retval.decode("utf8")
|
|
|
|
def eval_file(ctx, path):
|
|
with open(path, "r") as f:
|
|
code = f.read()
|
|
[s, l] = str_to_c(code)
|
|
|
|
duk.duk_push_lstring(ctx, s, l)
|
|
retval = duk_peval(ctx)
|
|
duk.duk_pop(ctx)
|
|
return retval
|
|
|
|
def load_file(ctx, path, var):
|
|
with open(path, "rb") as f:
|
|
data = f.read()
|
|
ptr = c_char_p(data)
|
|
duk.duk_push_buffer_raw(ctx, 0, 1 | 2)
|
|
duk.duk_config_buffer(ctx, -1, ptr, len(data))
|
|
duk.duk_put_global_string(ctx, str_to_c(var)[0])
|
|
return data
|
|
|
|
def save_file(ctx, path, var):
|
|
duk.duk_get_global_string(ctx, str_to_c(var)[0])
|
|
sz = c_size_t()
|
|
duk.duk_get_buffer_data.restype = c_void_p
|
|
buf = duk.duk_get_buffer_data(ctx, -1, byref(sz))
|
|
s = string_at(buf, sz.value)
|
|
with open(path, "wb") as f:
|
|
f.write(s)
|
|
|
|
def initialize():
|
|
# initialize
|
|
context = duk_create_heap_default()
|
|
ctx = c_void_p(context)
|
|
|
|
# duktape does not expose a standard "global" by default
|
|
duk_eval_string_noresult(ctx, "var global = (function(){ return this; }).call(null);")
|
|
|
|
# load library
|
|
eval_file(ctx, "shim.min.js")
|
|
eval_file(ctx, "xlsx.full.min.js")
|
|
|
|
# get version string
|
|
duk_eval_string(ctx, "XLSX.version")
|
|
print(f"SheetJS Library Version {duk_get_string(ctx, -1)}")
|
|
duk.duk_pop(ctx)
|
|
return [context, ctx]
|
|
|
|
def parse_file(ctx, path, name):
|
|
# read file
|
|
# NOTE: data is captured here to avoid GC
|
|
data = load_file(ctx, path, "buf")
|
|
|
|
# parse workbook
|
|
duk_eval_string_noresult(ctx, f"{name} = XLSX.read(buf.slice(0, buf.length));")
|
|
|
|
def get_sheet_names(ctx, wb):
|
|
duk_eval_string(ctx, f"JSON.stringify({wb}.SheetNames)")
|
|
wsnames = duk_get_string(ctx, -1)
|
|
names = loads(wsnames)
|
|
duk.duk_pop(ctx)
|
|
return names
|
|
|
|
def get_csv_from_wb(ctx, wb, sheet_name=None):
|
|
if not sheet_name: sheet_name = f"{wb}.SheetNames[0]"
|
|
else: sheet_name = f"'{sheet_name}'"
|
|
duk_eval_string(ctx, f"XLSX.utils.sheet_to_csv({wb}.Sheets[{sheet_name}])")
|
|
csv = duk_get_string(ctx, -1)
|
|
duk.duk_pop(ctx)
|
|
return csv
|
|
|
|
def export_df_to_wb(ctx, df, path, sheet_name="Sheet1", book_type=None):
|
|
json = df.to_json(orient="records")
|
|
[s, l] = str_to_c(json)
|
|
duk.duk_push_lstring(ctx, s, l)
|
|
duk.duk_put_global_string(ctx, str_to_c("json")[0])
|
|
if not book_type: book_type = path.split(".")[-1]
|
|
duk_eval_string_noresult(ctx, f"""
|
|
aoo = JSON.parse(json);
|
|
newws = XLSX.utils.json_to_sheet(aoo);
|
|
newwb = XLSX.utils.book_new(newws, '{sheet_name}');
|
|
newbuf = XLSX.write(newwb, {{type:'buffer', bookType:'{book_type}'}});
|
|
""")
|
|
save_file(ctx, path, "newbuf")
|
|
|
|
def get_df_from_wb(ctx, wb, sheet_name=None):
|
|
csv = get_csv_from_wb(ctx, wb, sheet_name)
|
|
return read_csv(StringIO(csv))
|
|
|
|
class SheetJSWorkbook(object):
|
|
def __init__(self, sheetjs, wb):
|
|
self.ctx = sheetjs.ctx
|
|
self.wb = wb
|
|
|
|
def get_sheet_names(self):
|
|
return get_sheet_names(self.ctx, self.wb)
|
|
|
|
def get_df(self, sheet_name=None):
|
|
if sheet_name is None: sheet_name = self.get_sheet_names()[0]
|
|
return get_df_from_wb(self.ctx, self.wb, sheet_name)
|
|
|
|
class SheetJS(object):
|
|
def __init__(self, ctx):
|
|
self.ctx = ctx
|
|
self.wb_names = []
|
|
|
|
def read_file(self, path):
|
|
self.wb_names.append(f"wb{len(self.wb_names)}")
|
|
parse_file(self.ctx, path, self.wb_names[-1])
|
|
return SheetJSWorkbook(self, self.wb_names[-1])
|
|
|
|
def write_df(self, df, path, sheet_name = None):
|
|
export_df_to_wb(self.ctx, df, path, sheet_name)
|
|
|
|
class SheetJSWrapper(object):
|
|
def __enter__(self):
|
|
[context, ctx] = initialize()
|
|
self.context = context
|
|
self.ctx = ctx
|
|
return SheetJS(ctx)
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
duk.duk_destroy_heap(self.ctx) |