summaryrefslogtreecommitdiff
path: root/ocaml/urb.py
diff options
context:
space:
mode:
authorpolwex <polwex@sortug.com>2025-10-05 22:57:55 +0700
committerpolwex <polwex@sortug.com>2025-10-05 22:57:55 +0700
commitc4b71435d9afdb67450f320f54fb7aa99dcae85e (patch)
treea08c4c2f7965a95fcfe6dda09629d3f103d25a0b /ocaml/urb.py
parentfcedfddf00b3f994e4f4e40332ac7fc192c63244 (diff)
fixed jamcue
Diffstat (limited to 'ocaml/urb.py')
-rw-r--r--ocaml/urb.py405
1 files changed, 405 insertions, 0 deletions
diff --git a/ocaml/urb.py b/ocaml/urb.py
new file mode 100644
index 0000000..904233e
--- /dev/null
+++ b/ocaml/urb.py
@@ -0,0 +1,405 @@
+
+#!/usr/bin/env python3
+
+# TODO:
+# - -h text
+# - output to clay
+
+import os
+import sys
+import logging
+import json
+import requests
+import argparse
+import base64
+
+logging.basicConfig(
+ level=logging.WARNING,
+ format='%(levelname)s %(funcName)s %(lineno)s - %(message)s',
+ stream=sys.stderr,
+)
+
+logging.debug(['sys.argv', sys.argv])
+
+def preprocess_args(old_args):
+ """Split out []
+
+ We use [] to delimit tuples. The following syntaxes are all equivalent:
+
+ -- --open --open a b --close --open c d --close --close
+ -- [ [ a b ] [ c d ] ]
+ -- [ [a b] [c d] ]
+ -- [[a b] [c d]]
+ -- etc
+
+ We don't allow [[a b][c d]]. The rule is that we accept zero or more [ at
+ the beginning of a token and zero or more ] at the end of a token.
+
+ In this function, we convert all legal syntaxes to as if they were entered
+ as in the first example above. This allows them to be parsed by a
+ relatively sane argparse system.
+ """
+
+ if old_args == []:
+ return []
+ if old_args[0][0] == '[':
+ if len(old_args[0]) > 1:
+ r = preprocess_args([old_args[0][1:]] + old_args[1:])
+ return ['--open'] + r
+ else:
+ return ['--open'] + preprocess_args(old_args[1:])
+ if old_args[0][-1] == ']':
+ if len(old_args[0]) > 1:
+ return preprocess_args([old_args[0][:-1]]) + \
+ ['--close'] + preprocess_args(old_args[1:])
+ else:
+ return ['--close'] + preprocess_args(old_args[1:])
+ return [old_args[0]] + preprocess_args(old_args[1:])
+
+args = preprocess_args(sys.argv[1:])
+
+logging.debug(['preprocessed', args])
+
+
+class sourceAction(argparse.Action):
+ """Handle source flag.
+
+ This is all the 'primitive' source flags -- no nesting, no tuple stuff,
+ just one flag with one argument.
+
+ Besides the normal argparse.Action arguments, we require the following named
+ argument:
+
+ -- which='foo'. Since all source flags use res.source, this specifies the
+ key of the entry for this flag.
+ """
+
+ def __init__(self, option_strings, dest, **kwargs):
+ self.which = kwargs['which']
+ del kwargs['which']
+ super(sourceAction, self).__init__(option_strings, dest, **kwargs)
+
+ def __call__(self, parser, res, new_value, option_string):
+ logging.debug('%r %r' % (new_value, option_string))
+ logging.debug('source %s' % res.source)
+ logging.debug('level %s' % res.level)
+
+ if res.source is not None:
+ def help(source, level):
+ logging.debug('source %s' % source)
+ logging.debug('level %s' % level)
+ if not isinstance(source, list):
+ raise ValueError('Already specified one source')
+ elif level == 0:
+ msg = 'Already specified a source %r %s' % (source, level)
+ raise ValueError(msg)
+ elif level == 1:
+ return source + [self.construct_value(new_value)]
+ else:
+ return source[:-1] + [help(source[-1], level - 1)]
+ res.source = help(res.source, res.level)
+ else:
+ res.source = \
+ self.construct_value(new_value)
+
+ logging.debug(res.source)
+
+ def construct_value(self, new_value):
+ if new_value == '-':
+ return self.construct_value(''.join(sys.stdin.readlines()))
+ elif new_value[0:2] == '@@':
+ with open(new_value[2:]) as f:
+ content = f.readlines()
+ return self.construct_value(''.join(content))
+ else:
+ return {self.which: new_value}
+
+class transformerAction(argparse.Action):
+ """Handle transformer flag.
+
+ This is all the tranformer flags. Each flag takes one argument and
+ transforms the previous source.
+
+ Besides the normal argparse.Action arguments, we require the following named
+ arguments:
+
+ -- which='foo'. Since all source flags use res.source, this specifies the
+ key of the entry for this flag.
+
+ -- nesting='foo'. The key for the argument is 'foo'.
+ """
+
+ def __init__(self, option_strings, dest, **kwargs):
+ self.which = kwargs['which']
+ self.nesting = kwargs['nesting']
+ del kwargs['which']
+ del kwargs['nesting']
+ super(transformerAction, self).__init__(option_strings, dest, **kwargs)
+
+ def __call__(self, parser, res, new_value, option_string):
+ logging.debug('%r %r' % (new_value, option_string))
+ logging.debug('source %s' % res.source)
+ logging.debug('level %s' % res.level)
+
+ if res.source is None:
+ raise ValueError('Need source before transformer')
+ else:
+ def help(source, level):
+ logging.debug('source %s' % source)
+ logging.debug('level %s' % level)
+ if level == 0 or level is None:
+ res = {self.nesting: new_value, "next": source}
+ return {self.which: res}
+ elif not isinstance(source, list):
+ raise ValueError('Already specified one source')
+ else:
+ return source[:-1] + [help(source[-1], level - 1)]
+ res.source = help(res.source, res.level)
+
+ logging.debug(res.source)
+
+class openAction(argparse.Action):
+ """Handle open tuple.
+
+ Opens a source tuple. Can only exist in the same places as any other
+ source.
+ """
+
+ def __init__(self, option_strings, dest, **kwargs):
+ super(openAction, self).__init__(option_strings, dest, **kwargs)
+
+ def __call__(self, parser, res, new_value, option_string):
+ if res.level is None:
+ res.level = 0
+
+ logging.debug('source %s' % res.source)
+ logging.debug('level %s' % res.level)
+
+ if res.source is None:
+ res.source = []
+ res.level = 1
+ return
+
+ def help(source, level):
+ if not isinstance(source, list):
+ raise ValueError('Starting tuple after source is finished')
+ if level == 1:
+ return (source + [[]], level + 1)
+ elif level > 1:
+ rsource, rlevel = help(source[-1], level - 1)
+ return (source[:-1] + [rsource], rlevel + 1)
+ else:
+ msg = 'opening strange level %r %s' % (source, level)
+ raise ValueError(msg)
+
+ res.source, res.level = help(res.source, res.level)
+
+class closeAction(argparse.Action):
+ """Handle close tuple.
+
+ Closes a source tuple. Can only exist when a tuple is already open.
+ """
+
+ def __init__(self, option_strings, dest, **kwargs):
+ super(closeAction, self).__init__(option_strings, dest, **kwargs)
+
+ def __call__(self, parser, res, new_value, option_string):
+ if res.level is None:
+ raise ValueError('Ending tuple before starting one')
+
+ logging.debug('level %s' % res.level)
+
+ if res.source is None:
+ raise ValueError('Ending tuple with empty source')
+
+ def help(source, level):
+ if not isinstance(source, list):
+ raise ValueError('Ending tuple that isn\'t a tuple')
+ if level == 1:
+ return level - 1
+ elif level > 1:
+ return help(source[-1], level - 1) + 1
+ else:
+ msg = 'closing strange level %r %s' % (source, level)
+ raise ValueError(msg)
+
+ res.level = help(res.source, res.level)
+
+ logging.debug('level %s' % res.level)
+
+class sinkAction(argparse.Action):
+ """Handle sink flag.
+
+ We expect only one sinkAction to ever be executed. We recommend using
+ mutually_exclusive_group's.
+
+ Besides the normal action flags, we require the following named argument:
+
+ -- which='foo'. Since all sink flags use res.sink, this specifies the key
+ of the entry for this flag.
+ """
+
+ def __init__(self, option_strings, dest, **kwargs):
+ self.which = kwargs['which']
+ del kwargs['which']
+ super(sinkAction, self).__init__(option_strings, dest, **kwargs)
+
+ def __call__(self, parser, res, new_value, option_string):
+ res.sink = self.construct_value(new_value)
+
+ logging.debug(res.sink)
+
+ def construct_value(self, new_value):
+ if self.which == 'output-file':
+ return {self.which: new_value[::-1].replace('.','/',1)[::-1]}
+ elif self.which == 'output-pill':
+ return {self.which: new_value[::-1].replace('.','/',1)[::-1]}
+ else:
+ return {self.which: new_value}
+
+class FullPaths(argparse.Action):
+ """Expand user- and relative-paths"""
+ def __call__(self, parser, namespace, values, option_string=None):
+ if values != None:
+ path = os.path.abspath(os.path.expanduser(values))
+ setattr(namespace, self.dest, path)
+
+def is_dir(dirname):
+ """Checks if a path is an actual directory"""
+ if not os.path.isdir(dirname):
+ msg = "{0} is not a directory".format(dirname)
+ raise argparse.ArgumentTypeError(msg)
+ else:
+ return dirname
+
+def get_args():
+ """Get CLI arguments and options"""
+ parser = argparse.ArgumentParser(description="""do something""")
+
+ parser.add_argument('alignments', help="The folder of alignments",
+ action=FullPaths, type=is_dir)
+
+parser = argparse.ArgumentParser(description='headless urbit')
+parser.add_argument('pier', nargs='?',
+ help='target urbit directory',
+ action=FullPaths, type=is_dir)
+parser.add_argument('-d', '--dojo', which='dojo',
+ metavar='command-line',
+ help='run dojo command',
+ action=sourceAction, dest='source')
+parser.add_argument('-D', '--data', which='data',
+ metavar='text',
+ help='literal text data',
+ action=sourceAction)
+parser.add_argument('-c', '--clay', which='clay',
+ metavar='clay-path',
+ help='load data from clay',
+ action=sourceAction)
+parser.add_argument('-u', '--url', which='url',
+ metavar='url',
+ help='pull data from url',
+ action=sourceAction)
+parser.add_argument('-a', '--api', which='api',
+ metavar='command',
+ help='get data from api',
+ action=sourceAction)
+parser.add_argument('-g', '--get-api', which='get-api',
+ metavar='api:endpoint',
+ help='get data from api endpoint',
+ action=sourceAction)
+parser.add_argument('-l', '--listen-api', which='listen-api',
+ metavar='api:event',
+ help='listen to event from api',
+ action=sourceAction)
+parser.add_argument('-m', '--mark', which='as',
+ metavar='mark',
+ help='transform a source to another mark',
+ nesting='mark',
+ action=transformerAction)
+parser.add_argument('-H', '--hoon', which='hoon',
+ metavar='code',
+ help='transform a source by hoon code',
+ nesting='code',
+ action=transformerAction)
+parser.add_argument('--open',
+ nargs=0,
+ help='start tuple',
+ action=openAction, dest='level')
+parser.add_argument('--close',
+ nargs=0,
+ help='stop tuple',
+ action=closeAction)
+
+sinks = parser.add_mutually_exclusive_group()
+sinks.add_argument('-s', '--stdout', const={'stdout': None},
+ default={'stdout': None},
+ action='store_const', dest='sink')
+sinks.add_argument('-f', '--output-file', which='output-file',
+ metavar='path',
+ action=sinkAction)
+sinks.add_argument('-P', '--output-pill', which='output-pill',
+ metavar='path',
+ action=sinkAction)
+sinks.add_argument('-C', '--output-clay', which='output-clay',
+ metavar='clay-path',
+ action=sinkAction)
+sinks.add_argument('-U', '--output-url', which='url',
+ metavar='url',
+ action=sinkAction)
+sinks.add_argument('-t', '--to-api', which='to-api',
+ metavar='api-command',
+ action=sinkAction)
+sinks.add_argument('-n', '--send-api', which='send-api',
+ metavar='api:endpoint',
+ action=sinkAction)
+sinks.add_argument('-x', '--command', which='command',
+ metavar='command',
+ action=sinkAction)
+sinks.add_argument('-p', '--app', which='app',
+ metavar='app',
+ action=sinkAction)
+
+
+args = parser.parse_args(args)
+
+
+if args.source is None:
+ args.source = {"data": ''.join(sys.stdin)}
+
+payload = {"source": args.source, "sink": args.sink}
+logging.debug(['payload', json.dumps(payload)])
+
+PORT = ""
+
+if args.pier is None:
+ PORT = os.environ.get('LENS_PORT', 12323)
+ # if 'LENS_PORT' not in os.environ:
+ # logging.warning("No pier or port specified, looking on port " + str(PORT))
+else:
+ with open(os.path.join(args.pier, ".http.ports")) as ports:
+ for line in ports:
+ if -1 != line.find("loopback"):
+ PORT = line.split()[0]
+ logging.info("Found port %s" % PORT)
+ break
+
+ if not PORT:
+ logging.error("Error reading port from .http.ports file")
+ sys.exit(1)
+
+url = "http://localhost:%s" % PORT
+
+r = requests.post(url, data=json.dumps(payload))
+
+if r.text[0] == '"':
+ # Python 3: decode unicode escape sequences
+ print(r.text[1:-1].encode().decode('unicode_escape'))
+elif r.text[0] == '{':
+ # print(r.text)
+ json_data = json.loads(r.text)
+ logging.debug(json_data)
+ with open(json_data['file'][:0:-1].replace('/','.',1)[::-1], 'wb') as f:
+ f.write(base64.b64decode(json_data['data']))
+else:
+ logging.warning("unrecognized response")
+ print(r.text)