From bc9242a6de8eab96b30574037aaa0b6f86eaac5a Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 23 Feb 2020 09:31:56 +0000 Subject: [PATCH 1/8] saving progress incase anything funky happens --- native/Makefile | 20 +++--- native/mock_k.c | 148 ++++++++++++++++++++++++++++++++++++++++++ native/mock_k.h | 138 +++++++-------------------------------- native/obj/.gitignore | 4 -- native/ocamlshmipc.c | 61 +++++++++++++++++ native/shmipc.c | 27 +++----- native/shmipc.h | 34 ++++++++++ native/shmmain.c | 30 +++++++-- 8 files changed, 311 insertions(+), 151 deletions(-) create mode 100644 native/mock_k.c delete mode 100644 native/obj/.gitignore create mode 100644 native/ocamlshmipc.c create mode 100644 native/shmipc.h diff --git a/native/Makefile b/native/Makefile index a31f869..abec847 100644 --- a/native/Makefile +++ b/native/Makefile @@ -6,8 +6,8 @@ detected_OS := $(shell uname -s) IDIR=. CC=gcc -CFLAGS=-DKXVER=3 -fPIC -I$(IDIR) -Wall -CDFLAGS=-shared +CFLAGS=-DKXVER=3 -fPIC -I$(IDIR) -Wall -g3 -gdwarf -g2 +#CDFLAGS=-shared ifeq ($(detected_OS),Darwin) # Mac OS X CDFLAGS += -undefined dynamic_lookup endif @@ -17,18 +17,22 @@ endif ODIR=obj LIBS=-lm -DEPS = k.h wire.h +DEPS=k.h wire.h mock_k.h shmipc.h +OBJECTS=$(ODIR)/shmipc.o $(ODIR)/mock_k.o -all: obj/hpet.so obj/shmipc.so obj/shmmain +all: $(ODIR)/mock.o $(ODIR)/shmipc.o $(ODIR)/shmmain k.h: wget https://raw.githubusercontent.com/KxSystems/kdb/master/c/c/k.h -$(ODIR)/%.so: %.c $(DEPS) - $(CC) -o $@ $< $(CFLAGS) $(CDFLAGS) +$(ODIR)/mock_k.o: mock_k.c $(DEPS) + $(CC) -c -o $@ $< $(CFLAGS) $(CDFLAGS) -$(ODIR)/shmmain: shmmain.c shmipc.c mock_k.h $(DEPS) - $(CC) -o $@ $< $(CFLAGS) -g -O0 +$(ODIR)/shmipc.o: shmipc.c $(ODIR)/mock_k.o $(DEPS) + $(CC) -c -o $@ $< mock_k.o $(CFLAGS) $(CDFLAGS) + +$(ODIR)/shmmain: shmmain.c $(DEPS) $(OBJECTS) + $(CC) -o $@ $< $(OBJECTS) $(CFLAGS) -g -O0 coverage: obj/shmcov obj/shmcov -v -a WORLD :demo/stress diff --git a/native/mock_k.c b/native/mock_k.c new file mode 100644 index 0000000..3ebab39 --- /dev/null +++ b/native/mock_k.c @@ -0,0 +1,148 @@ +// Copyright 2018 Tea Engineering Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// stub out kx layer (the extern function in k.h) so we can run in main, profile, +// valgrind native code etc. +// +// It's impossible to find the source of memory leaks with the Kx slab allocator +// as valgrind can't hook the individual allocations to record the stack. +// + +#include +#include + +#include "k.h" + +#define KERR -128 + +// globals +int kxx_errno = 0; +char* kxx_msg = NULL; + +K krr(const S msg) { + printf("'%s\n", msg); + kxx_errno = -1; + kxx_msg = msg; + return (K)NULL; +} +K orr(const S msg) { + printf("%s\n", msg); + kxx_errno = -1; + kxx_msg = msg; + return (K)NULL; +} +K ee(K ignored) { + if (kxx_errno != 0) { + K r = ktn(KERR, 1); + r->s = kxx_msg; + kxx_errno = 0; + return r; + } + return ignored; +} +K ki(int i) { + K r = ktn(-KI, 0); + r->i = i; + return r; +} +K kj(long long i) { + K r = ktn(-KJ, 0); + r->j = i; + return r; +} +K kss(const char* ss) { + K r = ktn(-KS, 0); + r->s = (char*)ss; + return r; +} +K dl(void* fnptr, J n) { + K r = ktn(100,0); + r->s = fnptr; + //r->a = n; + return r; +} + +typedef K (*kfunc_1arg)(K); +typedef K (*kfunc_2arg)(K,K); +typedef K (*kfunc_3arg)(K,K,K); + +K dot(K x, K y) { // call function pointer in x with args in mixed list y + if (x->t != 100) return krr("x must be fptr"); + if (y->t != 0) return krr("y must be list"); + //if (x->a == 2) { // not sure why this check was there + kfunc_2arg fptr = (kfunc_2arg)x->s; + return fptr(kK(y)[0], kK(y)[1]); +} + +K knk(int n, ...) { // create a mixed list from K's in varg + va_list ap; + K r = ktn(0, n); + va_start(ap, n); //Requires the last fixed parameter (to get the address) + for(int j=0; j 19) ? 8 : sizefor[abs(type)]; + K r = malloc(sizeof(struct k0) + n*sz); + r->r = 0; + r->t = type; + if (n > 0) r->n = n; // keep: trap accessing n for atom in valgrind + return r; +} + +// dummy serialiser returns a single byte array [SOH] +K b9(I mode, K obj) { + K r = ktn(KB,1); + r->G0[0] = 1; + return r; +} + +K d9(K obj) { + return ki(1); +} + +int okx(K obj) { return 1; } +// repl equivelent wrapper (protected eval, with and without gc) +K pe(K x) { + if (kxx_errno != 0) exit(-1); + return x; +} +void per(K x) { + pe(x); + if (x != NULL) r0(x); +} + +void r0(K x) { // Decrement the object‘s reference count + if (x == 0) { printf("Bug r0 of null pointer %p\n", x); return; } + if (x->r < 0) printf("Bug double-free of %p\n", x); + if (x->r == 0) { + if (x->t == 0) for (int i = 0; i < x->n; i++) r0(kK(x)[i]); + // flip? + // dict? + free(x); + } else { + x->r--; + } +} +K r1(K x) { // Increment the object‘s reference count + x->r++; + return x; +} + diff --git a/native/mock_k.h b/native/mock_k.h index 4d30f5d..9222423 100644 --- a/native/mock_k.h +++ b/native/mock_k.h @@ -18,126 +18,32 @@ // It's impossible to find the source of memory leaks with the Kx slab allocator // as valgrind can't hook the individual allocations to record the stack. // +#ifndef MOCK_K +#define MOCK_K -// globals -int kxx_errno = 0; -char* kxx_msg = NULL; +#define KERR -128 -K krr(const S msg) { - printf("'%s\n", msg); - kxx_errno = -1; - kxx_msg = msg; - return (K)NULL; -} -K orr(const S msg) { - printf("%s\n", msg); - kxx_errno = -1; - kxx_msg = msg; - return (K)NULL; -} -K ee(K ignored) { - if (kxx_errno != 0) { - K r = ktn(KERR, 1); - r->s = kxx_msg; - kxx_errno = 0; - return r; - } - return ignored; -} -K ki(int i) { - K r = ktn(-KI, 0); - r->i = i; - return r; -} -K kj(long long i) { - K r = ktn(-KJ, 0); - r->j = i; - return r; -} -K kss(const char* ss) { - K r = ktn(-KS, 0); - r->s = (char*)ss; - return r; -} -K dl(void* fnptr, int n) { - K r = ktn(100,0); - r->s = fnptr; - r->a = n; - return r; -} +K krr(const S msg); +K orr(const S msg); +K ee(K ignored); +K ki(int i); +K kj(long long i); +K kss(const char* ss); +K dl(void* fnptr, J n); typedef K (*kfunc_1arg)(K); typedef K (*kfunc_2arg)(K,K); typedef K (*kfunc_3arg)(K,K,K); -K dot(K x, K y) { // call function pointer in x with args in mixed list y - if (x->t != 100) return krr("x must be fptr"); - if (y->t != 0) return krr("y must be list"); - if (x->a == 2) { - kfunc_2arg fptr = (kfunc_2arg)x->s; - return fptr(kK(y)[0], kK(y)[1]); - } - return ki(1); -} - -K knk(int n, ...) { // create a mixed list from K's in varg - va_list ap; - K r = ktn(0, n); - va_start(ap, n); //Requires the last fixed parameter (to get the address) - for(int j=0; j 19) ? 8 : sizefor[abs(type)]; - K r = malloc(sizeof(struct k0) + n*sz); - r->r = 0; - r->t = type; - if (n > 0) r->n = n; // keep: trap accessing n for atom in valgrind - return r; -} - -// dummy serialiser returns a single byte array [SOH] -K b9(I mode, K obj) { - K r = ktn(KB,1); - r->G0[0] = 1; - return r; -} - -K d9(K obj) { - return ki(1); -} - -int okx(K obj) { return 1; } -// repl equivelent wrapper (protected eval, with and without gc) -K pe(K x) { - if (kxx_errno != 0) exit(-1); - return x; -} -void per(K x) { - pe(x); - if (x != NULL) r0(x); -} - -void r0(K x) { // Decrement the object‘s reference count - if (x == 0) { printf("Bug r0 of null pointer %p\n", x); return; } - if (x->r < 0) printf("Bug double-free of %p\n", x); - if (x->r == 0) { - if (x->t == 0) for (int i = 0; i < x->n; i++) r0(kK(x)[i]); - // flip? - // dict? - free(x); - } else { - x->r--; - } -} -K r1(K x) { // Increment the object‘s reference count - x->r++; - return x; -} - +K dot(K x, K y); +K knk(int n, ...); +K ktn(int type, long long n);// dummy serialiser returns a single byte array [SOH] +K b9(I mode, K obj); +K d9(K obj); +int okx(K obj); +K pe(K x); +void per(K x); +void r0(K x); +K r1(K x); + +#endif \ No newline at end of file diff --git a/native/obj/.gitignore b/native/obj/.gitignore deleted file mode 100644 index 86d0cb2..0000000 --- a/native/obj/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -# Ignore everything in this directory -* -# Except this file -!.gitignore \ No newline at end of file diff --git a/native/ocamlshmipc.c b/native/ocamlshmipc.c new file mode 100644 index 0000000..3d1208b --- /dev/null +++ b/native/ocamlshmipc.c @@ -0,0 +1,61 @@ +#include "shmipc.h" +#include "mock_k.h" +#include +#include +#include + +// Store the passed in funptr so we can wrap it +void(*extern_cb_funptr)(uint64_t, const char*, uint64_t); + +K kdb_to_ocaml_cb(K x, K y) { + extern_cb_funptr(x->j, y, y->n); + return ki(5); +} + +int parse_data_raw(unsigned char* base, int lim, uint64_t index, void* userdata) { + printf(" pdr text: %llu '%.*s'\n", index, lim, base); + char* c = malloc((lim)*sizeof(char)); + memcpy(c, base, lim); + extern_cb_funptr(index, c, lim); + free(c); + return 0; +} +const parsedata_f* parser = &parse_data_raw; + +int append_data_raw(unsigned char* base, int lim, int* sz, K msg) { + return 0; +} +const appenddata_f* appender = &append_data_raw; + +K append_check_raw(queue_t* queue, K msg) { + return msg; +} +const encodecheck_f* encoder = &append_check_raw; + + + +void OCAMLshmipc_open_and_poll(const char* dirpath, void(*cb_funptr)(uint64_t, uint64_t)) { + K dir = kss(dirpath); + K parser = kss("text"); + uint64_t index = 0; + per(shmipc_init(dir, parser, appender, encoder)); + + extern_cb_funptr = cb_funptr; + K cb = dl(&kdb_to_ocaml_cb, 2); + K kindex = kj(index); + per(shmipc_tailer(dir, cb, kindex)); + per(shmipc_peek(dir)); + per(shmipc_debug((K)NULL)); + + while (1) { + usleep(500*1000); + per(shmipc_peek(dir)); + } + + per(shmipc_close(dir)); + + r0(dir); + r0(parser); + r0(cb); + r0(index); +} \ No newline at end of file diff --git a/native/shmipc.c b/native/shmipc.c index 9edc244..292a93c 100644 --- a/native/shmipc.c +++ b/native/shmipc.c @@ -29,7 +29,9 @@ #include #include +#include "shmipc.h" #include "k.h" +#include "mock_k.h" #include "wire.h" /** @@ -259,13 +261,13 @@ int dispatch_callback(tailer_t* tailer, uint64_t index, K obj) { int parse_data_text(unsigned char* base, int lim, uint64_t index, void* userdata) { tailer_t* tailer = (tailer_t*)userdata; - if (debug) printf(" text: %" PRIu64 " '%.*s'\n", index, lim, base); - + //printf(" pdt text: %" PRIu64 " '%.*s'\n", index, lim, base); // prep args and fire callback if (tailer->callback && index > tailer->dispatch_after) { K msg = ktn(KC, lim); // don't free this, handed over to q interp memcpy((char*)msg->G0, base, lim); return dispatch_callback(tailer, index, msg); + } else { } return 0; } @@ -317,18 +319,16 @@ K append_check_kx(queue_t* queue, K msg) { return r; } -K shmipc_init(K dir, K parser) { + +K shmipc_init(K dir, parsedata_f* parser, appenddata_f* appender, encodecheck_f* encoder) { if (dir->t != -KS) return krr("dir is not symbol"); if (dir->s[0] != ':') return krr("dir is not symbol handle (starts with :)"); - if (parser->t != -KS) return krr("parser is not symbol"); debug = getenv("SHMIPC_DEBUG"); wire_trace = getenv("SHMIPC_WIRETRACE"); pid_header = (getpid() & HD_MASK_LENGTH); - printf("shmipc: opening dir %s format %s\n", dir->s, parser->s); - // check if queue already open queue_t* queue = queue_head; while (queue != NULL) { @@ -417,18 +417,9 @@ K shmipc_init(K dir, K parser) { // cycleShift = Math.max(32, Maths.intLog2(indexCount) * 2 + Maths.intLog2(indexSpacing)); // verify user-specified parser for data segments - if (strncmp(parser->s, "text", parser->n) == 0) { - queue->parser = &parse_data_text; - queue->encoder = &append_data_text; - queue->encodecheck = &append_check_text; - } else if (strncmp(parser->s, "kx", parser->n) == 0) { - queue->parser = &parse_data_kx; - queue->encoder = &append_data_kx; - queue->encodecheck = &append_check_kx; - } else { - return krr("bad format: supports `kx and `text"); - } - if (debug) printf("shmipc: format set to %.*s\n", (int)parser->n, parser->s); + queue->parser = parser; + queue->encoder = appender; + queue->encodecheck = encoder; // Good to use queue->next = queue_head; diff --git a/native/shmipc.h b/native/shmipc.h new file mode 100644 index 0000000..393132b --- /dev/null +++ b/native/shmipc.h @@ -0,0 +1,34 @@ +#ifndef SHIMPC +#define SHIMPC + +#include + +#include "k.h" + +//typedef struct tailer tailer_t; +typedef struct queue queue_t; + +typedef int (*parsedata_f)(unsigned char*,int,uint64_t,void* userdata); +typedef int (*appenddata_f)(unsigned char*,int,int*,K); +typedef K (*encodecheck_f)(struct queue*,K); + +int parse_data_text(unsigned char* base, int lim, uint64_t index, void* userdata); +int append_data_text(unsigned char* base, int lim, int* sz, K msg); +K append_check_text(queue_t* queue, K msg); + +int parse_data_kx(unsigned char* base, int lim, uint64_t index, void* userdata); +int append_data_kx(unsigned char* base, int lim, int* sz, K msg); +K append_check_kx(queue_t* queue, K msg); + + +K shmipc_init(K dir, parsedata_f* parser, appenddata_f* appender, encodecheck_f* encoder); + +K shmipc_peek(K x); + +K shmipc_tailer(K dir, K cb, K kindex); + +K shmipc_debug(K x); + +K shmipc_close(K dir); + +#endif \ No newline at end of file diff --git a/native/shmmain.c b/native/shmmain.c index 1b49c37..97d814c 100644 --- a/native/shmmain.c +++ b/native/shmmain.c @@ -12,10 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include +#include +#include #include -#include "mock_k.h" +#include +#include +#include + +#include "k.h" +#include "shmipc.h" // This is a stand-alone tool for replaying a queue for use with valgrind etc that are tricky to // operate within KDB, e.g. with @@ -142,9 +148,23 @@ int main(const int argc, char **argv) { } // what follows is translated q calls from shmipc.q + K parser_type = kss(kxflag ? "kx" : "text"); K dir = kss(argv[optind]); - K parser = kss(kxflag ? "kx" : "text"); - per(shmipc_init(dir, parser)); + parsedata_f parser; + appenddata_f appender; + encodecheck_f encoder; + if (strncmp(parser_type->s, "text", parser_type->n) == 0) { + parser = &parse_data_text; + appender = &append_data_text; + encoder = &append_check_text; + } else if (strncmp(parser_type->s, "kx", parser_type->n) == 0) { + parser = &parse_data_kx; + appender = &append_data_kx; + encoder = &append_check_kx; + } else { + return krr("bad format: supports `kx and `text"); + } + per(shmipc_init(dir, parser, appender, encoder)); K cb = dl(&printxy, 2); K kindex = kj(index); @@ -244,7 +264,7 @@ int main(const int argc, char **argv) { per(shmipc_close(dir)); r0(dir); - r0(parser); + r0(parser_type); r0(cb); r0(kindex); From a9696519b4dfe726634836c27b4189856bca7cc2 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 23 Feb 2020 09:50:36 +0000 Subject: [PATCH 2/8] got it working again --- .gitignore | 1 + native/Makefile | 4 ++-- native/shmmain.c | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index c33cca7..991421b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .DS_Store native/k.h +native/obj diff --git a/native/Makefile b/native/Makefile index abec847..ea9b2c6 100644 --- a/native/Makefile +++ b/native/Makefile @@ -20,7 +20,7 @@ LIBS=-lm DEPS=k.h wire.h mock_k.h shmipc.h OBJECTS=$(ODIR)/shmipc.o $(ODIR)/mock_k.o -all: $(ODIR)/mock.o $(ODIR)/shmipc.o $(ODIR)/shmmain +all: $(ODIR)/mock_k.o $(ODIR)/shmipc.o $(ODIR)/shmmain k.h: wget https://raw.githubusercontent.com/KxSystems/kdb/master/c/c/k.h @@ -29,7 +29,7 @@ $(ODIR)/mock_k.o: mock_k.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) $(CDFLAGS) $(ODIR)/shmipc.o: shmipc.c $(ODIR)/mock_k.o $(DEPS) - $(CC) -c -o $@ $< mock_k.o $(CFLAGS) $(CDFLAGS) + $(CC) -c -o $@ $< $(ODIR)/mock_k.o $(CFLAGS) $(CDFLAGS) $(ODIR)/shmmain: shmmain.c $(DEPS) $(OBJECTS) $(CC) -o $@ $< $(OBJECTS) $(CFLAGS) -g -O0 diff --git a/native/shmmain.c b/native/shmmain.c index 97d814c..8c6c326 100644 --- a/native/shmmain.c +++ b/native/shmmain.c @@ -22,6 +22,7 @@ #include "k.h" #include "shmipc.h" +#include "mock_k.h" // This is a stand-alone tool for replaying a queue for use with valgrind etc that are tricky to // operate within KDB, e.g. with From 01d8a238df1217d7e03ff8275e6e7a8df736616e Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 23 Feb 2020 09:54:31 +0000 Subject: [PATCH 3/8] updating --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 991421b..8b95a14 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .DS_Store native/k.h native/obj +.vscode From 93e0e1414014d194abe6d03cce3fcd5a8b5793a6 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 23 Feb 2020 16:50:51 +0000 Subject: [PATCH 4/8] updates and working --- native/Makefile | 10 ++++++++-- native/ocamlshmipc.c | 18 ++++++++++-------- native/ocamlshmipcmain.c | 24 ++++++++++++++++++++++++ native/shmmain.c | 1 + 4 files changed, 43 insertions(+), 10 deletions(-) create mode 100644 native/ocamlshmipcmain.c diff --git a/native/Makefile b/native/Makefile index ea9b2c6..82ee610 100644 --- a/native/Makefile +++ b/native/Makefile @@ -20,7 +20,7 @@ LIBS=-lm DEPS=k.h wire.h mock_k.h shmipc.h OBJECTS=$(ODIR)/shmipc.o $(ODIR)/mock_k.o -all: $(ODIR)/mock_k.o $(ODIR)/shmipc.o $(ODIR)/shmmain +all: $(ODIR)/mock_k.o $(ODIR)/shmipc.o $(ODIR)/shmmain $(ODIR)/libocamlshmipc.so $(ODIR)/ocamlshmipcmain k.h: wget https://raw.githubusercontent.com/KxSystems/kdb/master/c/c/k.h @@ -34,6 +34,12 @@ $(ODIR)/shmipc.o: shmipc.c $(ODIR)/mock_k.o $(DEPS) $(ODIR)/shmmain: shmmain.c $(DEPS) $(OBJECTS) $(CC) -o $@ $< $(OBJECTS) $(CFLAGS) -g -O0 +$(ODIR)/libocamlshmipc.so: ocamlshmipc.c $(OBJECTS) + $(CC) -shared -o $@ $< $(OBJECTS) $(CFLAGS) -g -O0 + +$(ODIR)/ocamlshmipcmain: ocamlshmipcmain.c $(DEPS) $(OBJECTS) + $(CC) -o $@ $< $(OBJECTS) $(CFLAGS) -g -O0 + coverage: obj/shmcov obj/shmcov -v -a WORLD :demo/stress /Library/Developer/CommandLineTools/usr/bin/llvm-profdata merge default.profraw -output=default.profout @@ -54,7 +60,7 @@ fuzz: $(ODIR)/shmmain.fuzz afl-fuzz -i test/fuzz_input -o test/fuzz_output $(ODIR)/shmmain.fuzz -F - :../java/out $(ODIR)/shmmain.fuzz: shmmain.c shmipc.c mock_k.h $(DEPS) - /usr/local/Cellar/afl-fuzz/2.52b/bin/afl-clang -o $@ $< $(CFLAGS) -g -O0 + /home/jonathan/dev/git/AFL -o $@ $< $(CFLAGS) -g -O0 .PHONY: clean grind coverage syms fuzz diff --git a/native/ocamlshmipc.c b/native/ocamlshmipc.c index 3d1208b..dff12ab 100644 --- a/native/ocamlshmipc.c +++ b/native/ocamlshmipc.c @@ -4,6 +4,12 @@ #include #include +/* +Easy to use ocaml bindings for the 'open and tailing' use case. + +OCAMLshmipc_open_and_poll takes a directory and callback function that will get called on each CQ message. +*/ + // Store the passed in funptr so we can wrap it void(*extern_cb_funptr)(uint64_t, const char*, uint64_t); @@ -13,8 +19,8 @@ K kdb_to_ocaml_cb(K x, K y) { } int parse_data_raw(unsigned char* base, int lim, uint64_t index, void* userdata) { - printf(" pdr text: %llu '%.*s'\n", index, lim, base); - char* c = malloc((lim)*sizeof(char)); + char* c = malloc((lim+1)*sizeof(char)); + c[lim] = '\0'; memcpy(c, base, lim); extern_cb_funptr(index, c, lim); free(c); @@ -34,9 +40,8 @@ const encodecheck_f* encoder = &append_check_raw; -void OCAMLshmipc_open_and_poll(const char* dirpath, void(*cb_funptr)(uint64_t, uint64_t)) { +void OCAMLshmipc_open_and_poll(const char* dirpath, void(*cb_funptr)(uint64_t, const char*, uint64_t)) { K dir = kss(dirpath); - K parser = kss("text"); uint64_t index = 0; per(shmipc_init(dir, parser, appender, encoder)); @@ -45,8 +50,6 @@ void OCAMLshmipc_open_and_poll(const char* dirpath, void(*cb_funptr)(uint64_t, u K kindex = kj(index); per(shmipc_tailer(dir, cb, kindex)); per(shmipc_peek(dir)); - per(shmipc_debug((K)NULL)); - while (1) { usleep(500*1000); per(shmipc_peek(dir)); @@ -55,7 +58,6 @@ void OCAMLshmipc_open_and_poll(const char* dirpath, void(*cb_funptr)(uint64_t, u per(shmipc_close(dir)); r0(dir); - r0(parser); r0(cb); r0(index); -} \ No newline at end of file +} diff --git a/native/ocamlshmipcmain.c b/native/ocamlshmipcmain.c new file mode 100644 index 0000000..75d4db0 --- /dev/null +++ b/native/ocamlshmipcmain.c @@ -0,0 +1,24 @@ +#include "shmipc.h" +#include "mock_k.h" +#include +#include +#include + +#include "ocamlshmipc.c" + +/* +A simple debugging tool for ocamlshmipc +*/ + + +void cb_test(uint64_t index, const char* data, uint64_t len) +{ + printf("\t\tcb_test index:%llu len:%llu data:'%.*s'\n", index, len, len, data); +} + +int main(const int argc, char** argv) +{ + char* dir = ":/home/jonathan/dev/git/c-chronicle-q/queue"; + OCAMLshmipc_open_and_poll(dir, &cb_test); + return 0; +} \ No newline at end of file diff --git a/native/shmmain.c b/native/shmmain.c index 8c6c326..a336e10 100644 --- a/native/shmmain.c +++ b/native/shmmain.c @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include From d412ef3cdc25f040064167a991d8ef1ac68712b2 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 23 Feb 2020 19:10:57 +0000 Subject: [PATCH 5/8] reverting change --- native/Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/native/Makefile b/native/Makefile index 82ee610..ce434cc 100644 --- a/native/Makefile +++ b/native/Makefile @@ -60,7 +60,8 @@ fuzz: $(ODIR)/shmmain.fuzz afl-fuzz -i test/fuzz_input -o test/fuzz_output $(ODIR)/shmmain.fuzz -F - :../java/out $(ODIR)/shmmain.fuzz: shmmain.c shmipc.c mock_k.h $(DEPS) - /home/jonathan/dev/git/AFL -o $@ $< $(CFLAGS) -g -O0 + /usr/local/Cellar/afl-fuzz/2.52b/bin/afl-clang -o $@ $< $(CFLAGS) -g -O0 + .PHONY: clean grind coverage syms fuzz From dda2a1477acb105238d4eb36fc1c9e3a5284379a Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 23 Feb 2020 19:14:07 +0000 Subject: [PATCH 6/8] updates --- native/shmipc.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/native/shmipc.c b/native/shmipc.c index 292a93c..26b2ab8 100644 --- a/native/shmipc.c +++ b/native/shmipc.c @@ -261,14 +261,13 @@ int dispatch_callback(tailer_t* tailer, uint64_t index, K obj) { int parse_data_text(unsigned char* base, int lim, uint64_t index, void* userdata) { tailer_t* tailer = (tailer_t*)userdata; - //printf(" pdt text: %" PRIu64 " '%.*s'\n", index, lim, base); + if (debug) printf(" pdt text: %" PRIu64 " '%.*s'\n", index, lim, base); // prep args and fire callback if (tailer->callback && index > tailer->dispatch_after) { K msg = ktn(KC, lim); // don't free this, handed over to q interp memcpy((char*)msg->G0, base, lim); return dispatch_callback(tailer, index, msg); - } else { - } + } return 0; } From 1270c7329755f0e0d2b0921e794fa385068be59e Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 23 Feb 2020 19:23:04 +0000 Subject: [PATCH 7/8] updates --- native/Makefile | 1 + native/shmipc.c | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/native/Makefile b/native/Makefile index ce434cc..85c880f 100644 --- a/native/Makefile +++ b/native/Makefile @@ -25,6 +25,7 @@ all: $(ODIR)/mock_k.o $(ODIR)/shmipc.o $(ODIR)/shmmain $(ODIR)/libocamlshmipc.so k.h: wget https://raw.githubusercontent.com/KxSystems/kdb/master/c/c/k.h + $(ODIR)/mock_k.o: mock_k.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) $(CDFLAGS) diff --git a/native/shmipc.c b/native/shmipc.c index 26b2ab8..65c03e6 100644 --- a/native/shmipc.c +++ b/native/shmipc.c @@ -261,7 +261,7 @@ int dispatch_callback(tailer_t* tailer, uint64_t index, K obj) { int parse_data_text(unsigned char* base, int lim, uint64_t index, void* userdata) { tailer_t* tailer = (tailer_t*)userdata; - if (debug) printf(" pdt text: %" PRIu64 " '%.*s'\n", index, lim, base); + if (debug) printf(" text: %" PRIu64 " '%.*s'\n", index, lim, base); // prep args and fire callback if (tailer->callback && index > tailer->dispatch_after) { K msg = ktn(KC, lim); // don't free this, handed over to q interp From 8cc9e91a30618c1e9172b75488664886104337e6 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 23 Feb 2020 09:31:56 +0000 Subject: [PATCH 8/8] Take at OCAML bindings --- .gitignore | 2 + native/Makefile | 28 +++++--- native/mock_k.c | 148 +++++++++++++++++++++++++++++++++++++++ native/mock_k.h | 138 ++++++------------------------------ native/obj/.gitignore | 4 -- native/ocamlshmipc.c | 63 +++++++++++++++++ native/ocamlshmipcmain.c | 24 +++++++ native/shmipc.c | 26 +++---- native/shmipc.h | 34 +++++++++ native/shmmain.c | 30 ++++++-- 10 files changed, 347 insertions(+), 150 deletions(-) create mode 100644 native/mock_k.c delete mode 100644 native/obj/.gitignore create mode 100644 native/ocamlshmipc.c create mode 100644 native/ocamlshmipcmain.c create mode 100644 native/shmipc.h diff --git a/.gitignore b/.gitignore index c33cca7..8b95a14 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .DS_Store native/k.h +native/obj +.vscode diff --git a/native/Makefile b/native/Makefile index a31f869..85c880f 100644 --- a/native/Makefile +++ b/native/Makefile @@ -6,8 +6,8 @@ detected_OS := $(shell uname -s) IDIR=. CC=gcc -CFLAGS=-DKXVER=3 -fPIC -I$(IDIR) -Wall -CDFLAGS=-shared +CFLAGS=-DKXVER=3 -fPIC -I$(IDIR) -Wall -g3 -gdwarf -g2 +#CDFLAGS=-shared ifeq ($(detected_OS),Darwin) # Mac OS X CDFLAGS += -undefined dynamic_lookup endif @@ -17,18 +17,29 @@ endif ODIR=obj LIBS=-lm -DEPS = k.h wire.h +DEPS=k.h wire.h mock_k.h shmipc.h +OBJECTS=$(ODIR)/shmipc.o $(ODIR)/mock_k.o -all: obj/hpet.so obj/shmipc.so obj/shmmain +all: $(ODIR)/mock_k.o $(ODIR)/shmipc.o $(ODIR)/shmmain $(ODIR)/libocamlshmipc.so $(ODIR)/ocamlshmipcmain k.h: wget https://raw.githubusercontent.com/KxSystems/kdb/master/c/c/k.h -$(ODIR)/%.so: %.c $(DEPS) - $(CC) -o $@ $< $(CFLAGS) $(CDFLAGS) -$(ODIR)/shmmain: shmmain.c shmipc.c mock_k.h $(DEPS) - $(CC) -o $@ $< $(CFLAGS) -g -O0 +$(ODIR)/mock_k.o: mock_k.c $(DEPS) + $(CC) -c -o $@ $< $(CFLAGS) $(CDFLAGS) + +$(ODIR)/shmipc.o: shmipc.c $(ODIR)/mock_k.o $(DEPS) + $(CC) -c -o $@ $< $(ODIR)/mock_k.o $(CFLAGS) $(CDFLAGS) + +$(ODIR)/shmmain: shmmain.c $(DEPS) $(OBJECTS) + $(CC) -o $@ $< $(OBJECTS) $(CFLAGS) -g -O0 + +$(ODIR)/libocamlshmipc.so: ocamlshmipc.c $(OBJECTS) + $(CC) -shared -o $@ $< $(OBJECTS) $(CFLAGS) -g -O0 + +$(ODIR)/ocamlshmipcmain: ocamlshmipcmain.c $(DEPS) $(OBJECTS) + $(CC) -o $@ $< $(OBJECTS) $(CFLAGS) -g -O0 coverage: obj/shmcov obj/shmcov -v -a WORLD :demo/stress @@ -52,6 +63,7 @@ fuzz: $(ODIR)/shmmain.fuzz $(ODIR)/shmmain.fuzz: shmmain.c shmipc.c mock_k.h $(DEPS) /usr/local/Cellar/afl-fuzz/2.52b/bin/afl-clang -o $@ $< $(CFLAGS) -g -O0 + .PHONY: clean grind coverage syms fuzz clean: diff --git a/native/mock_k.c b/native/mock_k.c new file mode 100644 index 0000000..3ebab39 --- /dev/null +++ b/native/mock_k.c @@ -0,0 +1,148 @@ +// Copyright 2018 Tea Engineering Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// stub out kx layer (the extern function in k.h) so we can run in main, profile, +// valgrind native code etc. +// +// It's impossible to find the source of memory leaks with the Kx slab allocator +// as valgrind can't hook the individual allocations to record the stack. +// + +#include +#include + +#include "k.h" + +#define KERR -128 + +// globals +int kxx_errno = 0; +char* kxx_msg = NULL; + +K krr(const S msg) { + printf("'%s\n", msg); + kxx_errno = -1; + kxx_msg = msg; + return (K)NULL; +} +K orr(const S msg) { + printf("%s\n", msg); + kxx_errno = -1; + kxx_msg = msg; + return (K)NULL; +} +K ee(K ignored) { + if (kxx_errno != 0) { + K r = ktn(KERR, 1); + r->s = kxx_msg; + kxx_errno = 0; + return r; + } + return ignored; +} +K ki(int i) { + K r = ktn(-KI, 0); + r->i = i; + return r; +} +K kj(long long i) { + K r = ktn(-KJ, 0); + r->j = i; + return r; +} +K kss(const char* ss) { + K r = ktn(-KS, 0); + r->s = (char*)ss; + return r; +} +K dl(void* fnptr, J n) { + K r = ktn(100,0); + r->s = fnptr; + //r->a = n; + return r; +} + +typedef K (*kfunc_1arg)(K); +typedef K (*kfunc_2arg)(K,K); +typedef K (*kfunc_3arg)(K,K,K); + +K dot(K x, K y) { // call function pointer in x with args in mixed list y + if (x->t != 100) return krr("x must be fptr"); + if (y->t != 0) return krr("y must be list"); + //if (x->a == 2) { // not sure why this check was there + kfunc_2arg fptr = (kfunc_2arg)x->s; + return fptr(kK(y)[0], kK(y)[1]); +} + +K knk(int n, ...) { // create a mixed list from K's in varg + va_list ap; + K r = ktn(0, n); + va_start(ap, n); //Requires the last fixed parameter (to get the address) + for(int j=0; j 19) ? 8 : sizefor[abs(type)]; + K r = malloc(sizeof(struct k0) + n*sz); + r->r = 0; + r->t = type; + if (n > 0) r->n = n; // keep: trap accessing n for atom in valgrind + return r; +} + +// dummy serialiser returns a single byte array [SOH] +K b9(I mode, K obj) { + K r = ktn(KB,1); + r->G0[0] = 1; + return r; +} + +K d9(K obj) { + return ki(1); +} + +int okx(K obj) { return 1; } +// repl equivelent wrapper (protected eval, with and without gc) +K pe(K x) { + if (kxx_errno != 0) exit(-1); + return x; +} +void per(K x) { + pe(x); + if (x != NULL) r0(x); +} + +void r0(K x) { // Decrement the object‘s reference count + if (x == 0) { printf("Bug r0 of null pointer %p\n", x); return; } + if (x->r < 0) printf("Bug double-free of %p\n", x); + if (x->r == 0) { + if (x->t == 0) for (int i = 0; i < x->n; i++) r0(kK(x)[i]); + // flip? + // dict? + free(x); + } else { + x->r--; + } +} +K r1(K x) { // Increment the object‘s reference count + x->r++; + return x; +} + diff --git a/native/mock_k.h b/native/mock_k.h index 4d30f5d..9222423 100644 --- a/native/mock_k.h +++ b/native/mock_k.h @@ -18,126 +18,32 @@ // It's impossible to find the source of memory leaks with the Kx slab allocator // as valgrind can't hook the individual allocations to record the stack. // +#ifndef MOCK_K +#define MOCK_K -// globals -int kxx_errno = 0; -char* kxx_msg = NULL; +#define KERR -128 -K krr(const S msg) { - printf("'%s\n", msg); - kxx_errno = -1; - kxx_msg = msg; - return (K)NULL; -} -K orr(const S msg) { - printf("%s\n", msg); - kxx_errno = -1; - kxx_msg = msg; - return (K)NULL; -} -K ee(K ignored) { - if (kxx_errno != 0) { - K r = ktn(KERR, 1); - r->s = kxx_msg; - kxx_errno = 0; - return r; - } - return ignored; -} -K ki(int i) { - K r = ktn(-KI, 0); - r->i = i; - return r; -} -K kj(long long i) { - K r = ktn(-KJ, 0); - r->j = i; - return r; -} -K kss(const char* ss) { - K r = ktn(-KS, 0); - r->s = (char*)ss; - return r; -} -K dl(void* fnptr, int n) { - K r = ktn(100,0); - r->s = fnptr; - r->a = n; - return r; -} +K krr(const S msg); +K orr(const S msg); +K ee(K ignored); +K ki(int i); +K kj(long long i); +K kss(const char* ss); +K dl(void* fnptr, J n); typedef K (*kfunc_1arg)(K); typedef K (*kfunc_2arg)(K,K); typedef K (*kfunc_3arg)(K,K,K); -K dot(K x, K y) { // call function pointer in x with args in mixed list y - if (x->t != 100) return krr("x must be fptr"); - if (y->t != 0) return krr("y must be list"); - if (x->a == 2) { - kfunc_2arg fptr = (kfunc_2arg)x->s; - return fptr(kK(y)[0], kK(y)[1]); - } - return ki(1); -} - -K knk(int n, ...) { // create a mixed list from K's in varg - va_list ap; - K r = ktn(0, n); - va_start(ap, n); //Requires the last fixed parameter (to get the address) - for(int j=0; j 19) ? 8 : sizefor[abs(type)]; - K r = malloc(sizeof(struct k0) + n*sz); - r->r = 0; - r->t = type; - if (n > 0) r->n = n; // keep: trap accessing n for atom in valgrind - return r; -} - -// dummy serialiser returns a single byte array [SOH] -K b9(I mode, K obj) { - K r = ktn(KB,1); - r->G0[0] = 1; - return r; -} - -K d9(K obj) { - return ki(1); -} - -int okx(K obj) { return 1; } -// repl equivelent wrapper (protected eval, with and without gc) -K pe(K x) { - if (kxx_errno != 0) exit(-1); - return x; -} -void per(K x) { - pe(x); - if (x != NULL) r0(x); -} - -void r0(K x) { // Decrement the object‘s reference count - if (x == 0) { printf("Bug r0 of null pointer %p\n", x); return; } - if (x->r < 0) printf("Bug double-free of %p\n", x); - if (x->r == 0) { - if (x->t == 0) for (int i = 0; i < x->n; i++) r0(kK(x)[i]); - // flip? - // dict? - free(x); - } else { - x->r--; - } -} -K r1(K x) { // Increment the object‘s reference count - x->r++; - return x; -} - +K dot(K x, K y); +K knk(int n, ...); +K ktn(int type, long long n);// dummy serialiser returns a single byte array [SOH] +K b9(I mode, K obj); +K d9(K obj); +int okx(K obj); +K pe(K x); +void per(K x); +void r0(K x); +K r1(K x); + +#endif \ No newline at end of file diff --git a/native/obj/.gitignore b/native/obj/.gitignore deleted file mode 100644 index 86d0cb2..0000000 --- a/native/obj/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -# Ignore everything in this directory -* -# Except this file -!.gitignore \ No newline at end of file diff --git a/native/ocamlshmipc.c b/native/ocamlshmipc.c new file mode 100644 index 0000000..dff12ab --- /dev/null +++ b/native/ocamlshmipc.c @@ -0,0 +1,63 @@ +#include "shmipc.h" +#include "mock_k.h" +#include +#include +#include + +/* +Easy to use ocaml bindings for the 'open and tailing' use case. + +OCAMLshmipc_open_and_poll takes a directory and callback function that will get called on each CQ message. +*/ + +// Store the passed in funptr so we can wrap it +void(*extern_cb_funptr)(uint64_t, const char*, uint64_t); + +K kdb_to_ocaml_cb(K x, K y) { + extern_cb_funptr(x->j, y, y->n); + return ki(5); +} + +int parse_data_raw(unsigned char* base, int lim, uint64_t index, void* userdata) { + char* c = malloc((lim+1)*sizeof(char)); + c[lim] = '\0'; + memcpy(c, base, lim); + extern_cb_funptr(index, c, lim); + free(c); + return 0; +} +const parsedata_f* parser = &parse_data_raw; + +int append_data_raw(unsigned char* base, int lim, int* sz, K msg) { + return 0; +} +const appenddata_f* appender = &append_data_raw; + +K append_check_raw(queue_t* queue, K msg) { + return msg; +} +const encodecheck_f* encoder = &append_check_raw; + + + +void OCAMLshmipc_open_and_poll(const char* dirpath, void(*cb_funptr)(uint64_t, const char*, uint64_t)) { + K dir = kss(dirpath); + uint64_t index = 0; + per(shmipc_init(dir, parser, appender, encoder)); + + extern_cb_funptr = cb_funptr; + K cb = dl(&kdb_to_ocaml_cb, 2); + K kindex = kj(index); + per(shmipc_tailer(dir, cb, kindex)); + per(shmipc_peek(dir)); + while (1) { + usleep(500*1000); + per(shmipc_peek(dir)); + } + + per(shmipc_close(dir)); + + r0(dir); + r0(cb); + r0(index); +} diff --git a/native/ocamlshmipcmain.c b/native/ocamlshmipcmain.c new file mode 100644 index 0000000..75d4db0 --- /dev/null +++ b/native/ocamlshmipcmain.c @@ -0,0 +1,24 @@ +#include "shmipc.h" +#include "mock_k.h" +#include +#include +#include + +#include "ocamlshmipc.c" + +/* +A simple debugging tool for ocamlshmipc +*/ + + +void cb_test(uint64_t index, const char* data, uint64_t len) +{ + printf("\t\tcb_test index:%llu len:%llu data:'%.*s'\n", index, len, len, data); +} + +int main(const int argc, char** argv) +{ + char* dir = ":/home/jonathan/dev/git/c-chronicle-q/queue"; + OCAMLshmipc_open_and_poll(dir, &cb_test); + return 0; +} \ No newline at end of file diff --git a/native/shmipc.c b/native/shmipc.c index 9edc244..65c03e6 100644 --- a/native/shmipc.c +++ b/native/shmipc.c @@ -29,7 +29,9 @@ #include #include +#include "shmipc.h" #include "k.h" +#include "mock_k.h" #include "wire.h" /** @@ -260,13 +262,12 @@ int dispatch_callback(tailer_t* tailer, uint64_t index, K obj) { int parse_data_text(unsigned char* base, int lim, uint64_t index, void* userdata) { tailer_t* tailer = (tailer_t*)userdata; if (debug) printf(" text: %" PRIu64 " '%.*s'\n", index, lim, base); - // prep args and fire callback if (tailer->callback && index > tailer->dispatch_after) { K msg = ktn(KC, lim); // don't free this, handed over to q interp memcpy((char*)msg->G0, base, lim); return dispatch_callback(tailer, index, msg); - } + } return 0; } @@ -317,18 +318,16 @@ K append_check_kx(queue_t* queue, K msg) { return r; } -K shmipc_init(K dir, K parser) { + +K shmipc_init(K dir, parsedata_f* parser, appenddata_f* appender, encodecheck_f* encoder) { if (dir->t != -KS) return krr("dir is not symbol"); if (dir->s[0] != ':') return krr("dir is not symbol handle (starts with :)"); - if (parser->t != -KS) return krr("parser is not symbol"); debug = getenv("SHMIPC_DEBUG"); wire_trace = getenv("SHMIPC_WIRETRACE"); pid_header = (getpid() & HD_MASK_LENGTH); - printf("shmipc: opening dir %s format %s\n", dir->s, parser->s); - // check if queue already open queue_t* queue = queue_head; while (queue != NULL) { @@ -417,18 +416,9 @@ K shmipc_init(K dir, K parser) { // cycleShift = Math.max(32, Maths.intLog2(indexCount) * 2 + Maths.intLog2(indexSpacing)); // verify user-specified parser for data segments - if (strncmp(parser->s, "text", parser->n) == 0) { - queue->parser = &parse_data_text; - queue->encoder = &append_data_text; - queue->encodecheck = &append_check_text; - } else if (strncmp(parser->s, "kx", parser->n) == 0) { - queue->parser = &parse_data_kx; - queue->encoder = &append_data_kx; - queue->encodecheck = &append_check_kx; - } else { - return krr("bad format: supports `kx and `text"); - } - if (debug) printf("shmipc: format set to %.*s\n", (int)parser->n, parser->s); + queue->parser = parser; + queue->encoder = appender; + queue->encodecheck = encoder; // Good to use queue->next = queue_head; diff --git a/native/shmipc.h b/native/shmipc.h new file mode 100644 index 0000000..393132b --- /dev/null +++ b/native/shmipc.h @@ -0,0 +1,34 @@ +#ifndef SHIMPC +#define SHIMPC + +#include + +#include "k.h" + +//typedef struct tailer tailer_t; +typedef struct queue queue_t; + +typedef int (*parsedata_f)(unsigned char*,int,uint64_t,void* userdata); +typedef int (*appenddata_f)(unsigned char*,int,int*,K); +typedef K (*encodecheck_f)(struct queue*,K); + +int parse_data_text(unsigned char* base, int lim, uint64_t index, void* userdata); +int append_data_text(unsigned char* base, int lim, int* sz, K msg); +K append_check_text(queue_t* queue, K msg); + +int parse_data_kx(unsigned char* base, int lim, uint64_t index, void* userdata); +int append_data_kx(unsigned char* base, int lim, int* sz, K msg); +K append_check_kx(queue_t* queue, K msg); + + +K shmipc_init(K dir, parsedata_f* parser, appenddata_f* appender, encodecheck_f* encoder); + +K shmipc_peek(K x); + +K shmipc_tailer(K dir, K cb, K kindex); + +K shmipc_debug(K x); + +K shmipc_close(K dir); + +#endif \ No newline at end of file diff --git a/native/shmmain.c b/native/shmmain.c index 1b49c37..a336e10 100644 --- a/native/shmmain.c +++ b/native/shmmain.c @@ -12,9 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include +#include +#include +#include #include +#include +#include +#include + +#include "k.h" +#include "shmipc.h" #include "mock_k.h" // This is a stand-alone tool for replaying a queue for use with valgrind etc that are tricky to @@ -142,9 +150,23 @@ int main(const int argc, char **argv) { } // what follows is translated q calls from shmipc.q + K parser_type = kss(kxflag ? "kx" : "text"); K dir = kss(argv[optind]); - K parser = kss(kxflag ? "kx" : "text"); - per(shmipc_init(dir, parser)); + parsedata_f parser; + appenddata_f appender; + encodecheck_f encoder; + if (strncmp(parser_type->s, "text", parser_type->n) == 0) { + parser = &parse_data_text; + appender = &append_data_text; + encoder = &append_check_text; + } else if (strncmp(parser_type->s, "kx", parser_type->n) == 0) { + parser = &parse_data_kx; + appender = &append_data_kx; + encoder = &append_check_kx; + } else { + return krr("bad format: supports `kx and `text"); + } + per(shmipc_init(dir, parser, appender, encoder)); K cb = dl(&printxy, 2); K kindex = kj(index); @@ -244,7 +266,7 @@ int main(const int argc, char **argv) { per(shmipc_close(dir)); r0(dir); - r0(parser); + r0(parser_type); r0(cb); r0(kindex);