"def select_up(self): r, c = self._index self._select_index(r-1, c) "
"def select_down(self): r, c = self._index self._select_index(r+1, c) "
"def select_left(self): r, c = self._index self._select_index(r, c-1) "
"def select_right(self): r, c = self._index self._select_index(r, c+1) "
"def write(self, filename): txt = self.tostring() with open(filename, 'w') as f: f.write(txt) "
"def find_args(self): return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \ self.program + self.program_args "
"def start(self, n): self.n = n return super(MPILauncher, self).start() "
"def _append_jpg(self, jpg, before_prompt=False): self._append_custom(self._insert_jpg, jpg, before_prompt) "
"def _append_png(self, png, before_prompt=False): self._append_custom(self._insert_png, png, before_prompt) "
"def _append_svg(self, svg, before_prompt=False): self._append_custom(self._insert_svg, svg, before_prompt) "
"def beforeContext(self): mods = sys.modules.copy() self._mod_stack.append(mods) "
"def get_system_cpu_times(): user, nice, system, idle = _psutil_osx.get_system_cpu_times() return _cputimes_ntuple(user, nice, system, idle) "
"def memoize(f): cache = {} def memf(*x): if x not in cache: cache[x] = f(*x) return cache[x] return memf "
"def new_qt_console(self, evt=None): return connect_qtconsole(self.ipkernel.connection_file, profile=self.ipkernel.profile) "
"def abort(self): assert not self.ready(), "Can't abort, I am already done!" return self._client.abort(self.msg_ids, targets=self._targets, block=True) "
"def loop_gtk(kernel): from .gui.gtkembed import GTKEmbed gtk_kernel = GTKEmbed(kernel) gtk_kernel.start() "
"def GOE(N): m = ra.standard_normal((N, N)) m += m.T return m/2 "
"def read_file(self, filename): self.lines, self.arcs = self._read_file(filename) "
"def info(self): return (self.identity, self.url, self.pub_url, self.location) "
"def add_section(self): sect = CodeBuilder(self.indent_amount) self.code.append(sect) return sect "
"def user_config_files(): return filter(os.path.exists, map(os.path.expanduser, config_files)) "
"def remove_task(message): task = Task.objects.get(pk=message['id']) task.delete() "
"def patch_if_missing(obj, name, method): setattr(obj, name, getattr(obj, name, method)) "
"def add(self, func, priority=0): self.chain.append((priority, func)) self.chain.sort(key=lambda x: x[0]) "
"def render_template(content, context): rendered = Template(content).render(Context(context)) return rendered "
"def configure(self, options, conf): self.conf = conf if not options.capture: self.enabled = False "
"def splitBy(data, num): return [data[i:i + num] for i in range(0, len(data), num)] "
"def _num_cpus_darwin(): p = subprocess.Popen(['sysctl', '-n', 'hw.ncpu'], stdout=subprocess.PIPE) return "
"def connect(com, peers, tree, pub_url, root_id): com.connect(peers, tree, pub_url, root_id) "
"def init_session(self): default_secure(self.config) self.session = Session(config=self.config, username=u'kernel') "
"def html_to_text(content): text = None h2t = html2text.HTML2Text() h2t.ignore_links = False text = h2t.handle(content) return text "
"def md_to_text(content): text = None html = markdown.markdown(content) if html: text = html_to_text(content) return text "
"def initialize(self, argv=None): super(BaseParallelApplication, self).initialize(argv) self.to_work_dir() self.reinit_logging() "
"def unregister_transformer(self, transformer): if transformer in self._transformers: self._transformers.remove(transformer) "
"def register_checker(self, checker): if checker not in self._checkers: self._checkers.append(checker) self.sort_checkers() "
"def unregister_checker(self, checker): if checker in self._checkers: self._checkers.remove(checker) "
"def enterEvent(self, event): super(CallTipWidget, self).enterEvent(event) self._hide_timer.stop() "
"def already_used(self, tok): if tok in self.jwts: return True self.jwts[tok] = time.time() return False "
"def _append_html(self, html, before_prompt=False): self._append_custom(self._insert_html, html, before_prompt) "
"def file_read(filename): fobj = open(filename, 'r') source = fobj.close() return source "
"def close(self): self.flush() setattr(sys,, self.ostream) self.file.close() self._closed = True "
"def write(self, data): self.file.write(data) self.ostream.write(data) self.ostream.flush() "
"def add_new_heart_handler(self, handler): self.log.debug("heartbeat::new_heart_handler: %s", handler) self._new_handlers.add(handler) "
"def _writable_dir(path): return os.path.isdir(path) and os.access(path, os.W_OK) "
"def get_ipython_package_dir(): ipdir = os.path.dirname(IPython.__file__) return py3compat.cast_unicode(ipdir, fs_encoding) "
"def wave_saver(u, x, y, t): global u_hist global t_hist t_hist.append(t) u_hist.append(1.0*u) "
"def get_pid_list(): pids = [int(x) for x in os.listdir('/proc') if x.isdigit()] return pids "
"def short_stack(): stack = inspect.stack()[:0:-1] return "\n".join(["%30s : %s @%d" % (t[3], t[1], t[2]) for t in stack]) "
"def data(fname): data_file = open(data_filename(fname)) try: return finally: data_file.close() "
"def chop(seq, size): def chunk(i): return seq[i:i+size] return map(chunk, xrange(0, len(seq), size)) "
"def file_matches(filename, patterns): return any(fnmatch.fnmatch(filename, pat) for pat in patterns) "
"def create_hb_stream(self, kernel_id): self._check_kernel_id(kernel_id) return super(MappingKernelManager, self).create_hb_stream(kernel_id) "
"def spin_after(f, self, *args, **kwargs): ret = f(self, *args, **kwargs) self.spin() return ret "
"def get_msg(self, block=True, timeout=None): "Gets a message if there is one that is ready." return self._in_queue.get(block, timeout) "
"def depth(n, tree): d = 0 parent = tree[n] while parent is not None: d += 1 parent = tree[parent] return d "
"def print_bintree(tree, indent=' '): for n in sorted(tree.keys()): print("%s%s" % (indent * depth(n, tree), n)) "
"def allreduce(self, f, value, flat=True): return self.reduce(f, value, flat=flat, all=True) "
"def report(self, morfs, directory=None): self.report_files(self.annotate_file, morfs, directory) "
"def object_info(**kw): infodict = dict(izip_longest(info_fields, [None])) infodict.update(kw) return infodict "
"def resume(self): for tracer in self.tracers: tracer.start() threading.settrace(self._installation_trace) "
"def _canonical_dir(self, morf): return os.path.split(CodeUnit(morf, self.file_locator).filename)[0] "
"def _warn(self, msg): self._warnings.append(msg) sys.stderr.write(" warning: %s\n" % msg) "
"def _atexit(self): if self._started: self.stop() if self.auto_data: "
"def analysis(self, morf): f, s, _, m, mf = self.analysis2(morf) return f, s, m, mf "
"def get_domain(url): if 'http' not in url.lower(): url = 'http://{}'.format(url) return urllib.parse.urlparse(url).hostname "
"def get_url_args(url): url_data = urllib.parse.urlparse(url) arg_dict = urllib.parse.parse_qs(url_data.query) return arg_dict "
"def save_policy(self, path): with open(path, 'wb') as f: pickle.dump(self.policy, f) "
"def copy_obs_dict(obs): return {k: np.copy(v) for k, v in obs.items()} "
"def sf01(arr): s = arr.shape return arr.swapaxes(0, 1).reshape(s[0] * s[1], *s[2:]) "
"def terminate(self): if self._pool is not None: self._pool.terminate() self._pool.join() self._pool = None "
"def height(self): if len(self.coords) <= 1: return 0 return np.max(self.yy) - np.min(self.yy) "
"def width(self): if len(self.coords) <= 1: return 0 return np.max(self.xx) - np.min(self.xx) "
"def log_if(level, msg, condition, *args): if condition: vlog(level, msg, *args) "
"def state_size(self): return (LSTMStateTuple(self._num_units, self._num_units) if self._state_is_tuple else 2 * self._num_units) "
"def word_to_id(self, word): if word in self._vocab: return self._vocab[word] else: return self._unk_id "
"def word_to_id(self, word): if word in self.vocab: return self.vocab[word] else: return self.unk_id "
"def getInputNames(self): inputs = self.getSpec().inputs return [inputs.getByIndex(i)[0] for i in xrange(inputs.getCount())] "
"def getOutputNames(self): outputs = self.getSpec().outputs return [outputs.getByIndex(i)[0] for i in xrange(outputs.getCount())] "
"def getVersion(): with open(os.path.join(REPO_DIR, "VERSION"), "r") as versionFile: return "
"def addValuesToField(self, i, numValues): assert (len(self.fields) > i) values = [self.addValueToField(i) for n in range(numValues)] return values "
"def getTotaln(self): n = sum([field.n for field in self.fields]) return n "
"def getTotalw(self): w = sum([field.w for field in self.fields]) return w "
"def getData(self, n): records = [self.getNext() for x in range(n)] return records "
"def clean(s): lines = [l.rstrip() for l in s.split('\n')] return '\n'.join(lines) "
"def addInstance(self, groundTruth, prediction, record=None, result=None): self.value = self.avg(prediction) "
"def _setPath(cls): cls._path = os.path.join(os.environ['NTA_DYNAMIC_CONF_DIR'], cls.customFileName) "
"def __advancePhase(self): self.__currentPhase = self.__currentPhase.enterPhase() return "
"def _reportCommandLineUsageErrorAndExit(parser, message): print(parser.get_usage()) print(message) sys.exit(1) "
"def bitsToString(arr): s = array('c', '.'*len(arr)) for i in xrange(len(arr)): if arr[i] == 1: s[i] = '*' return s "
"def quote(c): i = ord(c) return ESCAPE + HEX[i//16] + HEX[i % 16] "
"def time(self): "Return the time part, with tzinfo None." return time(self.hour, self.minute, self.second, self.microsecond) "
"def timetz(self): "Return the time part, with same tzinfo." return time(self.hour, self.minute, self.second, self.microsecond, self._tzinfo) "
"def countOf(a, b): "Return the number of times b occurs in a." count = 0 for i in a: if i == b: count += 1 return count "
"def free_temp(self, v): self.used_temps.remove(v) self.free_temps.add(v) "
"def _dump(self, tag, x, lo, hi): for i in xrange(lo, hi): yield '%s %s' % (tag, x[i]) "
"def pformat(o, indent=1, width=80, depth=None): return PrettyPrinter(indent=indent, width=width, depth=depth).pformat(o) "
"def isfile(path): try: st = os.stat(path) except os.error: return False return stat.S_ISREG(st.st_mode) "
"def isdir(s): try: st = os.stat(s) except os.error: return False return stat.S_ISDIR(st.st_mode) "
"def remove(self, value): if value not in self: raise KeyError(value) self.discard(value) "
"def output_image_link(self, m): return self.renderer.image_link('url'),'target'),'alt')) "

FloCo Dataset


We introduce a new large-scale dataset called "FloCo" for Flowchart images to Python Codes conversion. It contains 11,884 paired flowchart-code samples. Please refer to the paper for more details regarding statistics and dataset construction.

  author    = "Shukla, Shreya and 
              Gatti, Prajwal and 
              Kumar, Yogesh and
              Yadav, Vikash and
              Mishra, Anand",
  title     = "Towards Making Flowchart Images Machine Interpretable",
  booktitle = "ICDAR",
  year      = "2023",
