Dataset Preview
Viewer
The full dataset viewer is not available (click to read why). Only showing a preview of the rows.
Please pip install lz4
Error code:   UnexpectedError

Need help to make the dataset viewer work? Open a discussion for direct support.

signature (string)body (string)docstring (string)id (string)
"def pairwise(iterable):"
"a, b = tee(iterable)<EOL>next(b, None)<EOL>return zip(a, b)<EOL>"
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
"f3:m0"
"def score_meaning(text):"
"<EOL>all_characters = re.findall('<STR_LIT>', text) <EOL>if len(all_characters) == <NUM_LIT:0>:<EOL><INDENT>return <NUM_LIT:0><EOL><DEDENT>repetition_count = Counter(all_characters)<EOL>score = (len(all_characters)) ** <NUM_LIT:2> / (len(repetition_count) + len(text) / <NUM_LIT>)<EOL>return score<EOL>"
"Returns a score in [0,1] range if the text makes any sense in English."
"f11:m0"
"def get_top_n_meanings(strings, n):"
"scored_strings = [(s, score_meaning(s)) for s in strings]<EOL>scored_strings.sort(key=lambda tup: -tup[<NUM_LIT:1>])<EOL>return scored_strings[:n]<EOL>"
"Returns (text, score) for top n strings"
"f11:m1"
"def _make_methods():"
"for k, v in PokeAPI().get_endpoints().items():<EOL><INDENT>string = "<STR_LIT>"<EOL>string += ("<STR_LIT>"<EOL>.format(k.replace('<STR_LIT:->', '<STR_LIT:_>')) + '<STR_LIT>')<EOL>string += ("<STR_LIT>" +<EOL>"<STR_LIT>")<EOL>string += "<STR_LIT>".format(v.split('<STR_LIT:/>')[-<NUM_LIT:2>])<EOL>string += "<STR_LIT>"<EOL>string += '<STR_LIT>'<EOL>string += '<STR_LIT>'<EOL>print(string)<EOL><DEDENT>"
"Automagically generates methods based on the API endpoints"
"f17:m0"
"def path(self, name):"
"return self.url<EOL>"
"Returns a local filesystem path where the file can be retrieved using Python's built-in open() function. Storage systems that can't be accessed using open() should *not* implement this method."
"f19:c1:m1"
"def open(self, name, mode='<STR_LIT:rb>'):"
"self.request = urlopen(self.url)<EOL>if self.algorithm:<EOL><INDENT>self.hash = hashlib.new(self.algorithm)<EOL><DEDENT>return self<EOL>"
"Retrieves the specified file from storage."
"f19:c1:m2"
"def list(self, ignore_patterns):"
"return six.iteritems(self.firsts)<EOL>"
"List all files in all storages."
"f19:c2:m2"
"def find(self, path, all=False):"
"found = os.path.join(settings.STATIC_ROOT, path)<EOL>if all:<EOL><INDENT>return [found]<EOL><DEDENT>else:<EOL><INDENT>return found<EOL><DEDENT>"
"Looks for files in the app directories."
"f19:c2:m3"
"def give_unexpected_calls(method_calls, expected_methods_names):"
"return [call for call in method_calls<EOL>if call[<NUM_LIT:0>] not in expected_methods_names]<EOL>"
"TODO: Move this to a common test utils module."
"f28:m4"
"@classmethod<EOL><INDENT>def _unwrap_func(cls, decorated_func):<DEDENT>"
"if click is not None:<EOL><INDENT>if isinstance(decorated_func, click.Command):<EOL><INDENT>return cls._unwrap_func(decorated_func.callback)<EOL><DEDENT><DEDENT>if hasattr(decorated_func, '<STR_LIT>'):<EOL><INDENT>return cls._unwrap_func(decorated_func.__wrapped__)<EOL><DEDENT>else:<EOL><INDENT>return decorated_func<EOL><DEDENT>"
"This unwraps a decorated func, returning the inner wrapped func. This may become unnecessary with Python 3.4's inspect.unwrap()."
"f31:c0:m1"
"def _register_dependent(self, dependent, resource_name):"
"if dependent not in self.dependents:<EOL><INDENT>self.dependents[dependent] = []<EOL><DEDENT>self.dependents[dependent].insert(<NUM_LIT:0>, resource_name)<EOL>"
"Register a mapping of the dependent to resource name. After calling, dependency_register.dependents[dependent] should contain resource_name."
"f31:c0:m3"
"def register(self, resource_name, dependent=None):"
"if dependent is None:<EOL><INDENT>return partial(self.register, resource_name)<EOL><DEDENT>dependent = self._unwrap_dependent(dependent)<EOL>self._register_dependent(dependent, resource_name)<EOL>self._register_resource_dependency(resource_name, dependent)<EOL>return dependent<EOL>"
"Register the given dependent as depending on the "resource" named by resource_name."
"f31:c0:m5"
"@di.dependsOn('<STR_LIT>')<EOL>def multiply(n):"
"multiplier = di.resolver.unpack(multiply)<EOL>return multiplier * n<EOL>"
"Multiply the given number n by some configured multiplier."
"f36:m0"
"@providers.register('<STR_LIT>')<EOL>def give_multiplier():"
"return <NUM_LIT:2><EOL>"
"Give a multiplier of 2."
"f36:m1"
"@di.dependsOn('<STR_LIT>')<EOL>@di.dependsOn('<STR_LIT>')<EOL>def multiply_and_add(n):"
"multiplier, offset = di.resolver.unpack(multiply_and_add)<EOL>return (multiplier * n) + offset<EOL>"
"Multiply the given number n by some configured multiplier, and then add a configured offset."
"f37:m0"
"@providers.register('<STR_LIT>')<EOL>def give_multiplier():"
"return <NUM_LIT:2><EOL>"
"Give a multiplier of 2."
"f37:m1"
"@providers.register('<STR_LIT>')<EOL>def give_offset():"
"return <NUM_LIT:3><EOL>"
"Give an offset value of 3."
"f37:m2"
"def __enter__(self):"
"return self<EOL>"
":return: self"
"f39:c0:m1"
"def _debug_info(self):"
"self._msg('<STR_LIT>')<EOL>self._msg2('<STR_LIT>'.format(self._curdir))<EOL>self._msg2('<STR_LIT>'.format(self._session.cookies))<EOL>self._msg2('<STR_LIT>'.format(self._session.headers))<EOL>self._msg2('<STR_LIT>'.format(self._config))<EOL>self._msg2('<STR_LIT>'.format(self._custom))<EOL>self._msg2('<STR_LIT>'.format(self._account))<EOL>"
"Show a list of recently variables info."
"f39:c2:m1"
"def register(self, argtypes=r'<STR_LIT:M>', help_msg=None):"
"def format_args(method):<EOL><INDENT>def wrapped_method(*args, **kwargs):<EOL><INDENT>args_count = len(args) <EOL>argtypes_count = len(argtypes)<EOL>placeholder_count = argtypes.count('<STR_LIT:H>') + argtypes.count('<STR_LIT:h>')<EOL>if placeholder_count:<EOL><INDENT>min_args_count = (argtypes_count - placeholder_count)<EOL>if args_count < min_args_count or args_count > argtypes_count:<EOL><INDENT>raise KngetError("<STR_LIT>",<EOL>reason='<STR_LIT>'.format(args_count))<EOL><DEDENT><DEDENT>elif args_count != argtypes_count:<EOL><INDENT>raise KngetError("<STR_LIT>",<EOL>reason='<STR_LIT>'.format(args_count))<EOL><DEDENT>argv = [] <EOL>for i in range(args_count):<EOL><INDENT>if argtypes[i] in ('<STR_LIT:m>', '<STR_LIT:M>'):<EOL><INDENT>argv.append(args[i])<EOL><DEDENT>elif argtypes[i] in ('<STR_LIT:i>', '<STR_LIT:I>'):<EOL><INDENT>argv.append(int(args[i]))<EOL><DEDENT>elif argtypes[i] in ('<STR_LIT:s>', '<STR_LIT:S>'):<EOL><INDENT>argv.append(str(args[i]))<EOL><DEDENT>elif argtypes[i] in ('<STR_LIT:h>', '<STR_LIT:H>'):<EOL><INDENT>argv.append(args[i])<EOL><DEDENT>else:<EOL><INDENT>raise KngetError('<STR_LIT>'.format(argtypes[i]))<EOL><DEDENT><DEDENT>return method(*argv, **kwargs)<EOL><DEDENT>wrapped_method.__doc__ = method.__doc__<EOL>self._commands[method.__name__] = (<EOL>wrapped_method, help_msg<EOL>)<EOL>return wrapped_method<EOL><DEDENT>return format_args<EOL>"
"Register a method to a command. NOTE: Method registered here is unbound method, e.g. registered `run` command -> `KngetShell.run` So we call it should add `self` at first. See also: KngetShell.execute() :param argtypes: a str of the command args type. M: Myself -> self S: String -> str I: Integer -> int H: placeHolder -> pass or anything :param help_msg: a short help string of commands. :return: a callable function or method."
"f39:c3:m2"
"@command.register(argtypes=r'<STR_LIT>', help_msg="<STR_LIT>")<EOL><INDENT>def run(self, tags, begin, end=False):<DEDENT>"
"if not end:<EOL><INDENT>end = begin<EOL><DEDENT>super(KngetShell, self).run(tags, begin, int(end))<EOL>"
"Override method of class Knget"
"f39:c4:m1"
"@command.register(argtypes=r'<STR_LIT:M>', help_msg="<STR_LIT>")<EOL><INDENT>def debug(self):<DEDENT>"
"self._debug_info()<EOL>"
"Override method of `Knget._debug_info()`"
"f39:c4:m6"
"@command.register(argtypes=r'<STR_LIT>', help_msg="<STR_LIT>")<EOL><INDENT>def dbgrun(self, source):<DEDENT>"
"try:<EOL><INDENT>exec(source)<EOL><DEDENT>except Exception as e:<EOL><INDENT>self._msg2('<STR_LIT>'.format(e))<EOL><DEDENT>"
"Debug run. based on exec(), unsafe."
"f39:c4:m11"
"def load_key(pubkey):"
"try:<EOL><INDENT>return load_pem_public_key(pubkey.encode(), default_backend())<EOL><DEDENT>except ValueError:<EOL><INDENT>pubkey = pubkey.replace('<STR_LIT>', '<STR_LIT>').replace('<STR_LIT>', '<STR_LIT>')<EOL>return load_pem_public_key(pubkey.encode(), default_backend())<EOL><DEDENT>"
"Load public RSA key, with work-around for keys using incorrect header/footer format. Read more about RSA encryption with cryptography: https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/"
"f47:m0"
"def encrypt(pubkey, password):"
"key = load_key(pubkey)<EOL>encrypted_password = key.encrypt(password, PKCS1v15())<EOL>return base64.b64encode(encrypted_password)<EOL>"
"Encrypt password using given RSA public key and encode it with base64. The encrypted password can only be decrypted by someone with the private key (in this case, only Travis)."
"f47:m1"
"def fetch_public_key(repo):"
"keyurl = '<STR_LIT>'.format(repo)<EOL>data = json.loads(urlopen(keyurl).read().decode())<EOL>if '<STR_LIT:key>' not in data:<EOL><INDENT>errmsg = "<STR_LIT>".format(repo)<EOL>errmsg += "<STR_LIT>"<EOL>raise ValueError(errmsg)<EOL><DEDENT>return data['<STR_LIT:key>']<EOL>"
"Download RSA public key Travis will use for this repo. Travis API docs: http://docs.travis-ci.com/api/#repository-keys"
"f47:m2"
"def prepend_line(filepath, line):"
"with open(filepath) as f:<EOL><INDENT>lines = f.readlines()<EOL><DEDENT>lines.insert(<NUM_LIT:0>, line)<EOL>with open(filepath, '<STR_LIT:w>') as f:<EOL><INDENT>f.writelines(lines)<EOL><DEDENT>"
"Rewrite a file adding a line to its beginning."
"f47:m3"
"def update_travis_deploy_password(encrypted_password):"
"config = load_yaml_config(TRAVIS_CONFIG_FILE)<EOL>config['<STR_LIT>']['<STR_LIT:password>'] = dict(secure=encrypted_password)<EOL>save_yaml_config(TRAVIS_CONFIG_FILE, config)<EOL>line = ('<STR_LIT>'<EOL>'<STR_LIT>')<EOL>prepend_line(TRAVIS_CONFIG_FILE, line)<EOL>"
"Update the deploy section of the .travis.yml file to use the given encrypted password."
"f47:m6"
"def tokenize_words(string):"
"string = six.text_type(string)<EOL>return re.findall(WORD_TOKENIZATION_RULES, string)<EOL>"
"Tokenize input text to words. :param string: Text to tokenize :type string: str or unicode :return: words :rtype: list of strings"
"f50:m0"
"def tokenize_sents(string):"
"string = six.text_type(string)<EOL>spans = []<EOL>for match in re.finditer('<STR_LIT>', string):<EOL><INDENT>spans.append(match)<EOL><DEDENT>spans_count = len(spans)<EOL>rez = []<EOL>off = <NUM_LIT:0><EOL>for i in range(spans_count):<EOL><INDENT>tok = string[spans[i].start():spans[i].end()]<EOL>if i == spans_count - <NUM_LIT:1>:<EOL><INDENT>rez.append(string[off:spans[i].end()])<EOL><DEDENT>elif tok[-<NUM_LIT:1>] in ['<STR_LIT:.>', '<STR_LIT:!>', '<STR_LIT:?>', '<STR_LIT>', '<STR_LIT>']:<EOL><INDENT>tok1 = tok[re.search('<STR_LIT>', tok).start()-<NUM_LIT:1>]<EOL>next_tok = string[spans[i + <NUM_LIT:1>].start():spans[i + <NUM_LIT:1>].end()]<EOL>if (next_tok[<NUM_LIT:0>].isupper()<EOL>and not tok1.isupper()<EOL>and not (tok[-<NUM_LIT:1>] != '<STR_LIT:.>'<EOL>or tok1[<NUM_LIT:0>] == '<STR_LIT:(>'<EOL>or tok in ABBRS)):<EOL><INDENT>rez.append(string[off:spans[i].end()])<EOL>off = spans[i + <NUM_LIT:1>].start()<EOL><DEDENT><DEDENT><DEDENT>return rez<EOL>"
"Tokenize input text to sentences. :param string: Text to tokenize :type string: str or unicode :return: sentences :rtype: list of strings"
"f50:m1"
"def tokenize_text(string):"
"string = six.text_type(string)<EOL>rez = []<EOL>for part in string.split('<STR_LIT:\n>'):<EOL><INDENT>par = []<EOL>for sent in tokenize_sents(part):<EOL><INDENT>par.append(tokenize_words(sent))<EOL><DEDENT>if par:<EOL><INDENT>rez.append(par)<EOL><DEDENT><DEDENT>return rez<EOL>"
"Tokenize input text to paragraphs, sentences and words. Tokenization to paragraphs is done using simple Newline algorithm For sentences and words tokenizers above are used :param string: Text to tokenize :type string: str or unicode :return: text, tokenized into paragraphs, sentences and words :rtype: list of list of list of words"
"f50:m2"
"def crypt(header, body_bytes, secret):"
"<EOL><INDENT>unsigned char<EOL><DEDENT>= network-order (big-endian) unsigned int<EOL>length = len(body_bytes)<EOL>hed = (<EOL>truct.pack('<STR_LIT>', header.session_id) +<EOL>ix.b(secret) +<EOL>truct.pack('<STR_LIT:B>', header.version) +<EOL>truct.pack('<STR_LIT:B>', header.seq_no)<EOL>hashed = md5(unhashed).digest()<EOL>en(pad) < body_length):<EOL><INDENT>remake hash, appending it to pad until pad >= header.length<EOL><DEDENT>hile True:<EOL><INDENT>hashed = md5(unhashed + hashed).digest()<EOL>pad += hashed<EOL>if len(pad) >= body_length:<EOL><INDENT>break<EOL>"
"TACACS+ uses a shared secret key (known to both the client and server) to obfuscate the body of sent packets. Only the packet body (not the header) is obfuscated. https://datatracker.ietf.org/doc/draft-ietf-opsawg-tacacs/?include_text=1#section-3.7 ENCRYPTED {data} == data ^ pseudo_pad The pad is generated by concatenating a series of MD5 hashes (each 16 bytes long) and truncating it to the length of the input data. pseudo_pad = {MD5_1 [,MD5_2 [ ... ,MD5_n]]} truncated to len(data) The first MD5 hash is generated by concatenating the session_id, the secret key, the version number and the sequence number and then running MD5 over that stream. All of those input values are available in the packet header, except for the secret key which is a shared secret between the TACACS+ client and server. Subsequent hashes are generated by using the same input stream, but concatenating the previous hash value at the end of the input stream. MD5_1 = MD5{session_id, key, version, seq_no} MD5_2 = MD5{session_id, key, version, seq_no, MD5_1} .... MD5_n = MD5{session_id, key, version, seq_no, MD5_n-1} :param header: a TACACSHeader object :param body_bytes: packed bytes, i.e., `struct.pack(...)` :param secret: a key used to encrypt/obfuscate packets according to the TACACS+ spec :return: packed bytes, i.e., `struct.pack(...)` representing the obfuscated packet body"
"f53:m0"
"def __init__(self, header, body_bytes, secret):"
"self.header = header<EOL>self.body_bytes = body_bytes<EOL>self.secret = secret<EOL>"
":param header: a TACACSHeader object :param body_bytes: packed bytes, i.e., `struct.pack(...)` :param secret: a key used to encrypt/obfuscate packets according to the TACACS+ spec"
"f53:c0:m0"
"def __init__(self, host, port, secret, timeout=<NUM_LIT:10>, session_id=None,<EOL>family=socket.AF_INET, version_max=TAC_PLUS_MAJOR_VER,<EOL>version_min=TAC_PLUS_MINOR_VER):"
"self._sock = None<EOL>self.host = host<EOL>self.port = port<EOL>self.secret = secret<EOL>self.timeout = timeout<EOL>self.version_max = version_max<EOL>self.version_min = version_min<EOL>self.family = family<EOL>self.session_id = session_id or random.randint(<NUM_LIT:1>, <NUM_LIT:2> ** <NUM_LIT:32> - <NUM_LIT:1>)<EOL>"
":param host: hostname of the TACACS+ server :param port: port of the TACACS+ server, generally 49 :param secret: the secret key used to obfuscate packet bodies; can be `None` to disable packet body obfuscation :param session_id: a unique 32-bit int representing the session; if left empty, one will be auto-generated :param version_max: TACACS+ major version number, 12 :param version_min: TACACS+ minor version number, 0 or 1"
"f55:c0:m0"
"def send(self, body, req_type, seq_no=<NUM_LIT:1>):"
"<EOL>header = TACACSHeader(<EOL>self.version,<EOL>req_type,<EOL>self.session_id,<EOL>len(body.packed),<EOL>seq_no=seq_no<EOL>)<EOL>packet = TACACSPacket(header, body.packed, self.secret)<EOL>logger.debug('<STR_LIT:\n>'.join([<EOL>body.__class__.__name__,<EOL>'<STR_LIT>' % header,<EOL>'<STR_LIT>' % body,<EOL>]))<EOL>self.sock.send(bytes(packet))<EOL>readable, _, _ = select.select([self.sock], [], [], self.timeout)<EOL>if readable:<EOL><INDENT>header_bytes = self.sock.recv(<NUM_LIT:12>)<EOL>resp_header = TACACSHeader.unpacked(header_bytes)<EOL>if any([<EOL>resp_header.version_max != header.version_max,<EOL>resp_header.type != header.type,<EOL>resp_header.session_id != header.session_id<EOL>]):<EOL><INDENT>logger.error('<STR_LIT:\n>'.join([<EOL>resp_header.__class__.__name__,<EOL>'<STR_LIT>' % resp_header,<EOL>str(resp_header.packed)<EOL>]))<EOL>raise socket.error<EOL><DEDENT>body_bytes = six.b('<STR_LIT>')<EOL>remaining = resp_header.length<EOL>while remaining > <NUM_LIT:0>:<EOL><INDENT>body_bytes += self.sock.recv(remaining)<EOL>remaining = resp_header.length - len(body_bytes)<EOL><DEDENT>return TACACSPacket(<EOL>resp_header,<EOL>body_bytes,<EOL>self.secret<EOL>)<EOL><DEDENT>raise socket.timeout<EOL>"
"Send a TACACS+ message body :param body: packed bytes, i.e., `struct.pack(...)` :param req_type: TAC_PLUS_AUTHEN, TAC_PLUS_AUTHOR, TAC_PLUS_ACCT :param seq_no: The sequence number of the current packet. The first packet in a session MUST have the sequence number 1 and each subsequent packet will increment the sequence number by one. Thus clients only send packets containing odd sequence numbers, and TACACS+ servers only send packets containing even sequence numbers. :return: TACACSPacket :raises: socket.timeout, socket.error"
"f55:c0:m4"
"def authenticate(self, username, password, priv_lvl=TAC_PLUS_PRIV_LVL_MIN,<EOL>authen_type=TAC_PLUS_AUTHEN_TYPE_ASCII,<EOL>chap_ppp_id=None, chap_challenge=None,<EOL>rem_addr=TAC_PLUS_VIRTUAL_REM_ADDR, port=TAC_PLUS_VIRTUAL_PORT):"
"start_data = six.b('<STR_LIT>')<EOL>if authen_type in (TAC_PLUS_AUTHEN_TYPE_PAP,<EOL>TAC_PLUS_AUTHEN_TYPE_CHAP):<EOL><INDENT>self.version_min = TAC_PLUS_MINOR_VER_ONE<EOL>if authen_type == TAC_PLUS_AUTHEN_TYPE_PAP:<EOL><INDENT>start_data = six.b(password)<EOL><DEDENT>if authen_type == TAC_PLUS_AUTHEN_TYPE_CHAP:<EOL><INDENT>if not isinstance(chap_ppp_id, six.string_types):<EOL><INDENT>raise ValueError('<STR_LIT>')<EOL><DEDENT>if len(chap_ppp_id) != <NUM_LIT:1>:<EOL><INDENT>raise ValueError('<STR_LIT>')<EOL><DEDENT>if not isinstance(chap_challenge, six.string_types):<EOL><INDENT>raise ValueError('<STR_LIT>')<EOL><DEDENT>if len(chap_challenge) > <NUM_LIT:255>:<EOL><INDENT>raise ValueError('<STR_LIT>')<EOL><DEDENT>start_data = (<EOL>six.b(chap_ppp_id) +<EOL>six.b(chap_challenge) +<EOL>md5(six.b(<EOL>chap_ppp_id + password + chap_challenge<EOL>)).digest()<EOL>)<EOL><DEDENT><DEDENT>with self.closing():<EOL><INDENT>packet = self.send(<EOL>TACACSAuthenticationStart(username, authen_type, priv_lvl,<EOL>start_data, rem_addr=rem_addr, port=port),<EOL>TAC_PLUS_AUTHEN<EOL>)<EOL>reply = TACACSAuthenticationReply.unpacked(packet.body)<EOL>logger.debug('<STR_LIT:\n>'.join([<EOL>reply.__class__.__name__,<EOL>'<STR_LIT>' % packet.header,<EOL>'<STR_LIT>' % reply<EOL>]))<EOL>if authen_type == TAC_PLUS_AUTHEN_TYPE_ASCII and reply.getpass:<EOL><INDENT>packet = self.send(TACACSAuthenticationContinue(password),<EOL>TAC_PLUS_AUTHEN,<EOL>packet.seq_no + <NUM_LIT:1>)<EOL>reply = TACACSAuthenticationReply.unpacked(packet.body)<EOL>logger.debug('<STR_LIT:\n>'.join([<EOL>reply.__class__.__name__,<EOL>'<STR_LIT>' % packet.header,<EOL>'<STR_LIT>' % reply<EOL>]))<EOL>if reply.flags == TAC_PLUS_CONTINUE_FLAG_ABORT:<EOL><INDENT>reply.status = TAC_PLUS_AUTHEN_STATUS_FAIL<EOL><DEDENT><DEDENT><DEDENT>return reply<EOL>"
"Authenticate to a TACACS+ server with a username and password. :param username: :param password: :param priv_lvl: :param authen_type: TAC_PLUS_AUTHEN_TYPE_ASCII, TAC_PLUS_AUTHEN_TYPE_PAP, TAC_PLUS_AUTHEN_TYPE_CHAP :param chap_ppp_id: PPP ID when authen_type == 'chap' :param chap_challenge: challenge value when authen_type == 'chap' :param rem_addr: AAA request source, default to TAC_PLUS_VIRTUAL_REM_ADDR :param port: AAA port, default to TAC_PLUS_VIRTUAL_PORT :return: TACACSAuthenticationReply :raises: socket.timeout, socket.error"
"f55:c0:m5"
"def authorize(self, username, arguments=[],<EOL>authen_type=TAC_PLUS_AUTHEN_TYPE_ASCII, priv_lvl=TAC_PLUS_PRIV_LVL_MIN,<EOL>rem_addr=TAC_PLUS_VIRTUAL_REM_ADDR, port=TAC_PLUS_VIRTUAL_PORT):"
"with self.closing():<EOL><INDENT>packet = self.send(<EOL>TACACSAuthorizationStart(username,<EOL>TAC_PLUS_AUTHEN_METH_TACACSPLUS,<EOL>priv_lvl, authen_type, arguments,<EOL>rem_addr=rem_addr, port=port),<EOL>TAC_PLUS_AUTHOR<EOL>)<EOL>reply = TACACSAuthorizationReply.unpacked(packet.body)<EOL>logger.debug('<STR_LIT:\n>'.join([<EOL>reply.__class__.__name__,<EOL>'<STR_LIT>' % packet.header,<EOL>'<STR_LIT>' % reply<EOL>]))<EOL>reply_arguments = dict([<EOL>arg.split(six.b('<STR_LIT:=>'), <NUM_LIT:1>)<EOL>for arg in reply.arguments or []<EOL>if arg.find(six.b('<STR_LIT:=>')) > -<NUM_LIT:1>]<EOL>)<EOL>user_priv_lvl = int(reply_arguments.get(<EOL>six.b('<STR_LIT>'), TAC_PLUS_PRIV_LVL_MAX))<EOL>if user_priv_lvl < priv_lvl:<EOL><INDENT>reply.status = TAC_PLUS_AUTHOR_STATUS_FAIL<EOL><DEDENT><DEDENT>return reply<EOL>"
"Authorize with a TACACS+ server. :param username: :param arguments: The authorization arguments :param authen_type: TAC_PLUS_AUTHEN_TYPE_ASCII, TAC_PLUS_AUTHEN_TYPE_PAP, TAC_PLUS_AUTHEN_TYPE_CHAP :param priv_lvl: Minimal Required priv_lvl. :param rem_addr: AAA request source, default to TAC_PLUS_VIRTUAL_REM_ADDR :param port: AAA port, default to TAC_PLUS_VIRTUAL_PORT :return: TACACSAuthenticationReply :raises: socket.timeout, socket.error"
"f55:c0:m6"
"def account(self, username, flags, arguments=[],<EOL>authen_type=TAC_PLUS_AUTHEN_TYPE_ASCII, priv_lvl=TAC_PLUS_PRIV_LVL_MIN,<EOL>rem_addr=TAC_PLUS_VIRTUAL_REM_ADDR, port=TAC_PLUS_VIRTUAL_PORT):"
"with self.closing():<EOL><INDENT>packet = self.send(<EOL>TACACSAccountingStart(username, flags,<EOL>TAC_PLUS_AUTHEN_METH_TACACSPLUS,<EOL>priv_lvl, authen_type, arguments,<EOL>rem_addr=rem_addr, port=port),<EOL>TAC_PLUS_ACCT<EOL>)<EOL>reply = TACACSAccountingReply.unpacked(packet.body)<EOL>logger.debug('<STR_LIT:\n>'.join([<EOL>reply.__class__.__name__,<EOL>'<STR_LIT>' % packet.header,<EOL>'<STR_LIT>' % reply<EOL>]))<EOL><DEDENT>return reply<EOL>"
"Account with a TACACS+ server. :param username: :param flags: TAC_PLUS_ACCT_FLAG_START, TAC_PLUS_ACCT_FLAG_WATCHDOG, TAC_PLUS_ACCT_FLAG_STOP :param arguments: The authorization arguments :param authen_type: TAC_PLUS_AUTHEN_TYPE_ASCII, TAC_PLUS_AUTHEN_TYPE_PAP, TAC_PLUS_AUTHEN_TYPE_CHAP :param priv_lvl: Minimal Required priv_lvl. :param rem_addr: AAA request source, default to TAC_PLUS_VIRTUAL_REM_ADDR :param port: AAA port, default to TAC_PLUS_VIRTUAL_PORT :return: TACACSAccountingReply :raises: socket.timeout, socket.error"
"f55:c0:m7"
"def get_or_create(self, model, **spec):"
"return self.service.get_or_create(model, **spec)<EOL>"
"Args: model: class of Model get_data: the filter used for finding an instance create_data: the data used to create an instance, if none could be found"
"f62:c1:m7"
"def create_order(self, debtor, is_vat_included=True, due_date=None,<EOL>heading='<STR_LIT>', text_line1='<STR_LIT>', text_line2='<STR_LIT>',<EOL>debtor_data=None, delivery_data=None, products=None,<EOL>project=None, other_reference='<STR_LIT>', model=models.Order, **extra<EOL>):"
"debtor_data = debtor_data or {}<EOL>delivery_data = delivery_data or {}<EOL>delivery_date = delivery_data.get('<STR_LIT:date>', datetime.datetime.now())<EOL>our_reference = extra.get('<STR_LIT>', debtor.our_reference)<EOL>currency = extra.get('<STR_LIT>', debtor.currency)<EOL>layout = extra.get('<STR_LIT>', debtor.layout)<EOL>term_of_payment = extra.get('<STR_LIT>', debtor.term_of_payment)<EOL>date = extra.get('<STR_LIT:date>', datetime.datetime.now())<EOL>order_input = {<EOL>'<STR_LIT>': debtor,<EOL>'<STR_LIT>': extra.get('<STR_LIT>', <NUM_LIT:1>),<EOL>'<STR_LIT>': project,<EOL>}<EOL>for dd in ['<STR_LIT:name>', '<STR_LIT:address>', '<STR_LIT>', '<STR_LIT>', '<STR_LIT>', '<STR_LIT>']:<EOL><INDENT>order_input['<STR_LIT>' % dd] = debtor_data.get(dd, getattr(debtor, dd))<EOL><DEDENT>for dd in ['<STR_LIT:address>', '<STR_LIT>', '<STR_LIT>', '<STR_LIT>']:<EOL><INDENT>order_input['<STR_LIT>' % dd] = delivery_data.get(dd, getattr(debtor, dd))<EOL><DEDENT>order_input.update({<EOL>'<STR_LIT>': delivery_date or datetime.datetime.now(),<EOL>'<STR_LIT>': heading,<EOL>'<STR_LIT>': text_line1,<EOL>'<STR_LIT>': text_line2,<EOL>'<STR_LIT>': extra.get('<STR_LIT>', <NUM_LIT:0>),<EOL>'<STR_LIT>': extra.get('<STR_LIT>', <NUM_LIT:0>),<EOL>'<STR_LIT>': extra.get('<STR_LIT>', <NUM_LIT:0>),<EOL>'<STR_LIT>': extra.get('<STR_LIT>', <NUM_LIT:0>),<EOL>'<STR_LIT>': extra.get('<STR_LIT>', <NUM_LIT:0>),<EOL>'<STR_LIT>': extra.get('<STR_LIT>', <NUM_LIT:0>),<EOL>'<STR_LIT>': extra.get('<STR_LIT>', <NUM_LIT:0>),<EOL>'<STR_LIT:date>': date,<EOL>'<STR_LIT>': our_reference,<EOL>'<STR_LIT>': other_reference,<EOL>'<STR_LIT>': currency,<EOL>'<STR_LIT>': extra.get('<STR_LIT>', <NUM_LIT:1.0>),<EOL>'<STR_LIT>': is_vat_included,<EOL>'<STR_LIT>': layout,<EOL>'<STR_LIT>': due_date or datetime.datetime.now(),<EOL>'<STR_LIT>': term_of_payment<EOL>})<EOL>order_input.update(extra)<EOL>order = self.create(model, **order_input)<EOL>if products:<EOL><INDENT>for product in products:<EOL><INDENT>self.create_orderline(order, product)<EOL><DEDENT><DEDENT>return order<EOL>"
"Create a new Order. Args: debtor (Debtor): the debtor of the order debtor_data (mapping): map of debtor data {'postal_code: .., 'city': .., 'ean': ..} defaults to values on debitor instance for missing values delivery_data (mapping): map of delivery data {'address': ..., 'postal_code': ...} defaults to values on debitor instance for missing values due_date (datetime): due date heading (string): heading to be displayed in the order pdf text_line1 (string): first order description line text_line2 (string): second order description line other_reference (string): custom string to be used for identification extra (mapping): mapping of extra values to be passed in to the server call Returns: Order instance"
"f62:c1:m11"
"def build_model_code(client):"
"models = {}<EOL>references = {}<EOL>for method in client.wsdl.services[<NUM_LIT:0>].ports[<NUM_LIT:0>].methods.values():<EOL><INDENT>if not '<STR_LIT:_>' in method.name:<EOL><INDENT>continue<EOL><DEDENT>model, action = method.name.split('<STR_LIT:_>')<EOL>models.setdefault(model, {'<STR_LIT>': [], '<STR_LIT>': []})<EOL>references[model] = model<EOL>if model[-<NUM_LIT:1>] == '<STR_LIT:y>':<EOL><INDENT>references[model[:-<NUM_LIT:1>] + '<STR_LIT>'] = model<EOL><DEDENT>else:<EOL><INDENT>references[model+'<STR_LIT:s>'] = model<EOL><DEDENT><DEDENT>references['<STR_LIT>'] = '<STR_LIT>'<EOL>references['<STR_LIT>'] = '<STR_LIT>'<EOL>references['<STR_LIT>'] = '<STR_LIT>'<EOL>references['<STR_LIT>'] = '<STR_LIT>'<EOL>special = {<EOL>'<STR_LIT>': {<EOL>'<STR_LIT:type>': '<STR_LIT>',<EOL>'<STR_LIT:args>': ["<STR_LIT>", "<STR_LIT>"]<EOL>},<EOL>'<STR_LIT>': {<EOL>'<STR_LIT:type>': '<STR_LIT>',<EOL>'<STR_LIT:args>': ["<STR_LIT>", "<STR_LIT>"]<EOL>},<EOL>'<STR_LIT>': {<EOL>'<STR_LIT:type>': '<STR_LIT>',<EOL>'<STR_LIT:args>': ["<STR_LIT>", "<STR_LIT>"]<EOL>}<EOL>}<EOL>for line in ['<STR_LIT>', '<STR_LIT>', '<STR_LIT>', '<STR_LIT>']:<EOL><INDENT>method = '<STR_LIT>' % line<EOL>special[method] = {<EOL>'<STR_LIT:type>': '<STR_LIT>',<EOL>'<STR_LIT:args>': ["<STR_LIT>" % line, "<STR_LIT>" % method]<EOL>}<EOL><DEDENT>for method in client.wsdl.services[<NUM_LIT:0>].ports[<NUM_LIT:0>].methods.values():<EOL><INDENT>if not '<STR_LIT:_>' in method.name:<EOL><INDENT>continue<EOL><DEDENT>model, action = method.name.split('<STR_LIT:_>')<EOL>if action in ['<STR_LIT>', '<STR_LIT>', '<STR_LIT>']:<EOL><INDENT>continue<EOL><DEDENT>modeldata = models[model]<EOL>if action == '<STR_LIT>':<EOL><INDENT>camelname = action[<NUM_LIT:3>:]<EOL>modeldata['<STR_LIT>'].append({'<STR_LIT:name>': pythonize(camelname), '<STR_LIT>': action})<EOL><DEDENT>if re.findall('<STR_LIT>', action):<EOL><INDENT>camelname = action[<NUM_LIT:3>:]<EOL>modeldata['<STR_LIT>'].append({'<STR_LIT:name>': pythonize(camelname), '<STR_LIT>': action})<EOL><DEDENT>elif action.startswith('<STR_LIT>'):<EOL><INDENT>camelname = action[<NUM_LIT:6>:]<EOL>modeldata['<STR_LIT>'].append({'<STR_LIT:name>': pythonize(camelname), '<STR_LIT>': action})<EOL><DEDENT>elif action.startswith('<STR_LIT>'):<EOL><INDENT>propname = action[<NUM_LIT:3>:]<EOL>pyname = pythonize(propname)<EOL>if not propname:<EOL><INDENT>continue<EOL><DEDENT>get_type = re.findall('<STR_LIT>' % ('<STR_LIT:|>'.join(references.keys())), action)<EOL>if get_type and get_type[<NUM_LIT:0>] in references:<EOL><INDENT>refmodel = references[get_type[<NUM_LIT:0>]]<EOL>if action[-<NUM_LIT:1>] == '<STR_LIT:s>':<EOL><INDENT>modeldata['<STR_LIT>'].append({<EOL>'<STR_LIT:type>': '<STR_LIT>',<EOL>'<STR_LIT:args>': ["<STR_LIT>" % propname, "<STR_LIT>" % refmodel, "<STR_LIT>" % method.name],<EOL>'<STR_LIT:name>': pyname<EOL>})<EOL><DEDENT>else:<EOL><INDENT>modeldata['<STR_LIT>'].append({<EOL>'<STR_LIT:type>': '<STR_LIT>',<EOL>'<STR_LIT:args>': ["<STR_LIT>" % propname, "<STR_LIT>" % refmodel],<EOL>'<STR_LIT:name>': pyname<EOL>})<EOL><DEDENT><DEDENT>elif method.name in special:<EOL><INDENT>spdata = special[method.name]<EOL>modeldata['<STR_LIT>'].append({<EOL>'<STR_LIT:type>': spdata['<STR_LIT:type>'],<EOL>'<STR_LIT:args>': ["<STR_LIT>" % propname] + spdata['<STR_LIT:args>'],<EOL>'<STR_LIT:name>': pyname<EOL>})<EOL><DEDENT>else:<EOL><INDENT>modeldata['<STR_LIT>'].append({<EOL>'<STR_LIT:type>': '<STR_LIT>',<EOL>'<STR_LIT:args>': ["<STR_LIT>" % propname],<EOL>'<STR_LIT:name>': pyname<EOL>})<EOL><DEDENT><DEDENT><DEDENT>classes = []<EOL>for modelname, modeldata in models.items():<EOL><INDENT>propertycode = ["<STR_LIT>" % (md['<STR_LIT:name>'], md['<STR_LIT:type>'], '<STR_LIT:U+002CU+0020>'.join(md['<STR_LIT:args>']))<EOL>for md in modeldata['<STR_LIT>']]<EOL>code = "<STR_LIT>" % (modelname, '<STR_LIT>',<EOL>modeldata['<STR_LIT>'], '<STR_LIT>'.join(propertycode))<EOL>classes.append(code)<EOL><DEDENT>return "<STR_LIT>" + "<STR_LIT>".join(classes)<EOL>"
"Generate source code for e-conomic models based on WSDL connection. This is based on the assumption that the API follows a specific method naming-convention. Not all models and attributes has been tested. The source-generation is mostly to help improve readability and IDE auto-completion. :param client: :return: source code for models.py"
"f63:m3"
"def __find_handles(self, model, **spec):"
"server_calls = []<EOL>filter_names = dict([(f['<STR_LIT:name>'], f['<STR_LIT>'],) for f in model.get_filters()])<EOL>if not spec:<EOL><INDENT>server_calls.append({'<STR_LIT>': "<STR_LIT>" % model.__name__, '<STR_LIT:args>': []})<EOL><DEDENT>else:<EOL><INDENT>for key, value in spec.items():<EOL><INDENT>if not key in filter_names:<EOL><INDENT>raise ValueError("<STR_LIT>" % key)<EOL><DEDENT>args = []<EOL>if not hasattr(value, '<STR_LIT>'):<EOL><INDENT>value = [value]<EOL><DEDENT>if key.endswith('<STR_LIT>'):<EOL><INDENT>vtype = type(value[<NUM_LIT:0>]).__name__<EOL>array = self.soap_factory.create('<STR_LIT>' % vtype.capitalize())<EOL>getattr(array, "<STR_LIT:%s>" % vtype).extend(value)<EOL>args.append(array)<EOL><DEDENT>else:<EOL><INDENT>args.extend(value)<EOL><DEDENT>method = "<STR_LIT>" % (model.__name__, filter_names[key])<EOL>if filter_names[key].startswith('<STR_LIT>'):<EOL><INDENT>args = []<EOL><DEDENT>server_calls.append({'<STR_LIT>': method, '<STR_LIT:args>': args, '<STR_LIT>': "<STR_LIT>" % model.__name__})<EOL><DEDENT><DEDENT>handles = [<EOL>map(Handle, self.fetch_list(scall['<STR_LIT>'], scall.get('<STR_LIT>'), *scall['<STR_LIT:args>']))<EOL>for scall in server_calls<EOL>]<EOL>return [h.wsdl for h in reduce(set.intersection, map(set, handles))]<EOL>"
"find model instances based on given filter (spec) The filter is based on available server-calls, so some values might not be available for filtering. Multiple filter-values is going to do multiple server-calls. For complex filters in small datasets, it might be faster to fetch all and do your own in-memory filter. Empty filter will fetch all. :param model: subclass of EConomicsModel :param spec: mapping of values to filter by :return: a list of EConomicsModel instances"
"f63:c2:m10"
"def get(self, model, **spec):"
"handles = self.__find_handles(model, **spec)<EOL>if len(handles) > <NUM_LIT:1>:<EOL><INDENT>raise MultipleObjectsReturned()<EOL><DEDENT>if not handles:<EOL><INDENT>raise ObjectDoesNotExist()<EOL><DEDENT>return self.get_instance(model, handles[<NUM_LIT:0>])<EOL>"
"get a single model instance by handle :param model: model :param handle: instance handle :return:"
"f63:c2:m12"
"def has_add_permission(self, request):"
"return False<EOL>"
"Hides the add metric link in admin"
"f67:c0:m1"
"def get_queryset(self, request):"
"queryset = super(MetricGroupAdmin, self).get_queryset(request)<EOL>qs_values = queryset.values('<STR_LIT:id>', '<STR_LIT:name>')<EOL>distinct_names = {}<EOL>for metric in qs_values:<EOL><INDENT>distinct_names[metric['<STR_LIT:name>']] = metric['<STR_LIT:id>']<EOL><DEDENT>queryset = self.model.objects.filter(id__in=distinct_names.values())<EOL>return queryset<EOL>"
"Shows one entry per distinct metric name"
"f67:c1:m0"
"def save_model(self, request, obj, form, change):"
"like_metrics = self.model.objects.filter(name=obj.name)<EOL>updates = {}<EOL>for key in form.changed_data:<EOL><INDENT>updates[key] = form.cleaned_data[key]<EOL><DEDENT>like_metrics.update(**updates)<EOL>"
"Updates all metrics with the same name"
"f67:c1:m1"
"def generate_sample_data(point_numbers, interval):"
"src_names = ['<STR_LIT>', '<STR_LIT>', '<STR_LIT>']<EOL>sources = []<EOL>for name in src_names:<EOL><INDENT>sources.append(models.Source.objects.get_or_create(name=name)[<NUM_LIT:0>])<EOL><DEDENT>sources.append(None)<EOL>metric_names = ['<STR_LIT>',<EOL>'<STR_LIT>',<EOL>'<STR_LIT>',<EOL>'<STR_LIT>',<EOL>'<STR_LIT>']<EOL>for source in sources:<EOL><INDENT>for name in metric_names:<EOL><INDENT>metric = models.Metric.objects.get_or_create(source=source,<EOL>name=name)[<NUM_LIT:0>]<EOL>start = datetime.datetime.now() - datetime.timedelta(<EOL>minutes=interval * point_numbers)<EOL>for i in range(point_numbers):<EOL><INDENT>metric.latest_value = random.randint(<NUM_LIT:1>, <NUM_LIT:100>)<EOL>metric.last_updated = (start +<EOL>datetime.timedelta(minutes=interval * i))<EOL>metric.save()<EOL><DEDENT><DEDENT><DEDENT>"
"This function generates sample data and populates the databases :point_numbers: is an int defining the number of values for each metric :interval: is an int defining the interval between each results This method returns a list of metrics"
"f70:m0"
"def dashboard(request):"
"sources = (models.Source.objects.all().prefetch_related('<STR_LIT>')<EOL>.order_by('<STR_LIT:name>'))<EOL>metrics = SortedDict([(src, src.metric_set.all()) for src in sources])<EOL>no_source_metrics = models.Metric.objects.filter(source__isnull=True)<EOL>if no_source_metrics:<EOL><INDENT>metrics['<STR_LIT>'] = no_source_metrics<EOL><DEDENT>if request.META.get('<STR_LIT>', False):<EOL><INDENT>parent_template = '<STR_LIT>'<EOL><DEDENT>else:<EOL><INDENT>parent_template = '<STR_LIT>'<EOL><DEDENT>return render(request, '<STR_LIT>', {<EOL>'<STR_LIT>': metrics,<EOL>'<STR_LIT>': parent_template<EOL>})<EOL>"
"Shows the latest results for each source"
"f79:m0"
"def replace_variable(self, variable):"
"if variable == '<STR_LIT:x>':<EOL><INDENT>return self.value<EOL><DEDENT>if variable == '<STR_LIT:t>':<EOL><INDENT>return self.timedelta<EOL><DEDENT>raise ValueError("<STR_LIT>", variable)<EOL>"
"Substitute variables with numeric values"
"f82:c0:m1"
"def result(self):"
"<EOL>return self.eval_(ast.parse(self.expr).body[<NUM_LIT:0>].value)<EOL>"
"Evaluate expression and return result"
"f82:c0:m2"
"def _reset_changes(self):"
"self._original = {}<EOL>if self.last_updated is not None:<EOL><INDENT>self._original['<STR_LIT>'] = self.last_updated<EOL><DEDENT>"
"Stores current values for comparison later"
"f83:c1:m1"
"@property<EOL><INDENT>def whisper_filename(self):<DEDENT>"
"source_name = self.source_id and self.source.name or '<STR_LIT>'<EOL>return get_valid_filename("<STR_LIT>".format(source_name,<EOL>self.name))<EOL>"
"Build a file path to the Whisper database"
"f83:c1:m2"
"def get_or_create_archive(self):"
"return graph.WhisperDatabase(self.whisper_filename)<EOL>"
"Gets a Whisper DB instance. Creates it if it doesn't exist."
"f83:c1:m4"
"def load_archive(self, from_date, to_date=None):"
"return self.get_or_create_archive().fetch(from_date, to_date)<EOL>"
"Loads in historical data from Whisper database"
"f83:c1:m5"
"def get_value_display(self):"
"if self.display_as == '<STR_LIT>':<EOL><INDENT>return '<STR_LIT>'.format(self.latest_value)<EOL><DEDENT>if self.display_as == '<STR_LIT>':<EOL><INDENT>return bool(self.latest_value)<EOL><DEDENT>if self.display_as == '<STR_LIT>':<EOL><INDENT>return defaultfilters.filesizeformat(self.latest_value)<EOL><DEDENT>if self.display_as == '<STR_LIT>':<EOL><INDENT>return time.strftime('<STR_LIT>', time.gmtime(self.latest_value))<EOL><DEDENT>return self.latest_value<EOL>"
"Human friendly value output"
"f83:c1:m7"
"def time_between_updates(self):"
"if '<STR_LIT>' not in self._original:<EOL><INDENT>return <NUM_LIT:0><EOL><DEDENT>last_update = self._original['<STR_LIT>']<EOL>this_update = self.last_updated<EOL>return this_update - last_update<EOL>"
"Time between current `last_updated` and previous `last_updated`"
"f83:c1:m8"
"def do_transform(self):"
"if not self.transform:<EOL><INDENT>return<EOL><DEDENT>try:<EOL><INDENT>self.latest_value = utils.Transform(<EOL>expr=self.transform, value=self.latest_value,<EOL>timedelta=self.time_between_updates().total_seconds()).result()<EOL><DEDENT>except (TypeError, ValueError):<EOL><INDENT>logger.warn("<STR_LIT>",<EOL>self.transfrom, self.pk)<EOL><DEDENT>self.transform = '<STR_LIT>'<EOL>"
"Apply the transformation (if it exists) to the latest_value"
"f83:c1:m9"
"def do_counter_conversion(self):"
"if self.is_counter:<EOL><INDENT>if self._previous_counter_value is None:<EOL><INDENT>prev_value = self.latest_value<EOL><DEDENT>else:<EOL><INDENT>prev_value = self._previous_counter_value<EOL><DEDENT>self._previous_counter_value = self.latest_value<EOL>self.latest_value = self.latest_value - prev_value<EOL><DEDENT>"
"Update latest value to the diff between it and the previous value"
"f83:c1:m10"
"def _create(self):"
"if not os.path.exists(settings.SALMON_WHISPER_DB_PATH):<EOL><INDENT>os.makedirs(settings.SALMON_WHISPER_DB_PATH)<EOL><DEDENT>archives = [whisper.parseRetentionDef(retentionDef)<EOL>for retentionDef in settings.ARCHIVES.split("<STR_LIT:U+002C>")]<EOL>whisper.create(self.path, archives,<EOL>xFilesFactor=settings.XFILEFACTOR,<EOL>aggregationMethod=settings.AGGREGATION_METHOD)<EOL>"
"Create the Whisper file on disk"
"f91:c0:m2"
"def _update(self, datapoints):"
"if len(datapoints) == <NUM_LIT:1>:<EOL><INDENT>timestamp, value = datapoints[<NUM_LIT:0>]<EOL>whisper.update(self.path, value, timestamp)<EOL><DEDENT>else:<EOL><INDENT>whisper.update_many(self.path, datapoints)<EOL><DEDENT>"
"This method store in the datapoints in the current database. :datapoints: is a list of tupple with the epoch timestamp and value [(1368977629,10)]"
"f91:c0:m4"
"def fetch(self, from_time, until_time=None):"
"until_time = until_time or datetime.now()<EOL>time_info, values = whisper.fetch(self.path,<EOL>from_time.strftime('<STR_LIT:%s>'),<EOL>until_time.strftime('<STR_LIT:%s>'))<EOL>start_time, end_time, step = time_info<EOL>current = start_time<EOL>times = []<EOL>while current <= end_time:<EOL><INDENT>times.append(current)<EOL>current += step<EOL><DEDENT>return zip(times, values)<EOL>"
"This method fetch data from the database according to the period given fetch(path, fromTime, untilTime=None) fromTime is an datetime untilTime is also an datetime, but defaults to now. Returns a tuple of (timeInfo, valueList) where timeInfo is itself a tuple of (fromTime, untilTime, step) Returns None if no data can be returned"
"f91:c0:m5"
"def generate_settings():"
"conf_file = os.path.join(os.path.dirname(base_settings.__file__),<EOL>'<STR_LIT>', '<STR_LIT>')<EOL>conf_template = open(conf_file).read()<EOL>default_url = '<STR_LIT>'<EOL>site_url = raw_input("<STR_LIT>".format(<EOL>default_url))<EOL>site_url = site_url or default_url<EOL>secret_key = base64.b64encode(os.urandom(KEY_LENGTH))<EOL>api_key = base64.b64encode(os.urandom(KEY_LENGTH))<EOL>output = conf_template.format(api_key=api_key, secret_key=secret_key,<EOL>site_url=site_url)<EOL>return output<EOL>"
"This command is run when ``default_path`` doesn't exist, or ``init`` is run and returns a string representing the default data to put into their settings file."
"f92:m0"
"def configure_app(**kwargs):"
"sys_args = sys.argv<EOL>args, command, command_args = parse_args(sys_args[<NUM_LIT:1>:])<EOL>parser = OptionParser()<EOL>parser.add_option('<STR_LIT>', metavar='<STR_LIT>')<EOL>(options, logan_args) = parser.parse_args(args)<EOL>config_path = options.config<EOL>logan_configure(config_path=config_path, **kwargs)<EOL>"
"Builds up the settings using the same method as logan"
"f92:m2"
"def format(self):"
"self._netrc.hosts = dedictify_machines(self.machines)<EOL>rep = "<STR_LIT>"<EOL>for host in self._netrc.hosts.keys():<EOL><INDENT>attrs = self._netrc.hosts[host]<EOL>rep += "<STR_LIT>".format(host=host,<EOL>attrs=attrs)<EOL>if attrs[<NUM_LIT:1>]:<EOL><INDENT>rep += "<STR_LIT>".format(attrs=attrs)<EOL><DEDENT>rep += "<STR_LIT>".format(attrs=attrs)<EOL><DEDENT>for macro in self._netrc.macros.keys():<EOL><INDENT>rep += "<STR_LIT>".format(macro=macro)<EOL>for line in self._netrc.macros[macro]:<EOL><INDENT>rep += line<EOL><DEDENT>rep += "<STR_LIT:\n>"<EOL><DEDENT>return rep<EOL>"
"Dump the class data in the format of a .netrc file."
"f101:c0:m12"
"def find_version(fname):"
"version = '<STR_LIT>'<EOL>with open(fname, '<STR_LIT:r>') as fp:<EOL><INDENT>reg = re.compile(r'<STR_LIT>')<EOL>for line in fp:<EOL><INDENT>m = reg.match(line)<EOL>if m:<EOL><INDENT>version = m.group(<NUM_LIT:1>)<EOL>break<EOL><DEDENT><DEDENT><DEDENT>if not version:<EOL><INDENT>raise RuntimeError('<STR_LIT>')<EOL><DEDENT>return version<EOL>"
"Attempts to find the version number in the file names fname. Raises RuntimeError if not found."
"f102:m0"
"def __init__(self):"
"ArgumentParser.__init__(self, description=self.DESCRIPTION)<EOL>self._parameters = []<EOL>ArgumentParser.add_argument(self, '<STR_LIT>', action=JsonAction, dest='<STR_LIT>',<EOL>default=False,<EOL>help='<STR_LIT>')<EOL>ArgumentParser.add_argument(self, '<STR_LIT>', action=SaveJsonAction,<EOL>type=ChrisApp.path, dest='<STR_LIT>', metavar='<STR_LIT>',<EOL>help='<STR_LIT>')<EOL>if self.TYPE == '<STR_LIT>':<EOL><INDENT>ArgumentParser.add_argument(self, '<STR_LIT>', action='<STR_LIT:store>', type=str,<EOL>help='<STR_LIT>')<EOL><DEDENT>ArgumentParser.add_argument(self, '<STR_LIT>', action='<STR_LIT:store>', type=str,<EOL>help='<STR_LIT>')<EOL>ArgumentParser.add_argument(self, '<STR_LIT>', action='<STR_LIT:store>', dest='<STR_LIT>',<EOL>help='<STR_LIT>')<EOL>ArgumentParser.add_argument(self, '<STR_LIT>', action='<STR_LIT:store_true>',<EOL>dest='<STR_LIT>',<EOL>help='<STR_LIT>')<EOL>ArgumentParser.add_argument(self, '<STR_LIT>', action='<STR_LIT:store_true>',<EOL>dest='<STR_LIT>',<EOL>help='<STR_LIT>')<EOL>ArgumentParser.add_argument(self, '<STR_LIT>', action=VersionAction,<EOL>dest='<STR_LIT:version>', default=False,<EOL>help='<STR_LIT>')<EOL>ArgumentParser.add_argument(self, '<STR_LIT>', action=AppMetaDataAction,<EOL>dest='<STR_LIT>', default=False,<EOL>help='<STR_LIT>')<EOL>ArgumentParser.add_argument(self, '<STR_LIT>', '<STR_LIT>', action='<STR_LIT:store>', type=str,<EOL>dest='<STR_LIT>', default="<STR_LIT:0>",<EOL>help='<STR_LIT>')<EOL>ArgumentParser.add_argument(self, '<STR_LIT>', action=ManPageAction,<EOL>dest='<STR_LIT>', default=False,<EOL>help="<STR_LIT>")<EOL>self.define_parameters()<EOL>"
"The constructor of this app."
"f104:c7:m0"
"@staticmethod<EOL><INDENT>def path(string):<DEDENT>"
"if not os.path.exists(string):<EOL><INDENT>msg = "<STR_LIT>" % string<EOL>raise ArgumentTypeError(msg)<EOL><DEDENT>return string<EOL>"
"Define the 'path' data type that can be used by apps."
"f104:c7:m1"
"def show_man_page(self):"
"pass<EOL>"
"Show the app's man page (abstract method in this class)."
"f104:c7:m2"
"def define_parameters(self):"
"raise NotImplementedError("<STR_LIT>")<EOL>"
"Define the parameters used by this app (abstract method in this class)."
"f104:c7:m3"
"def run(self, options):"
"raise NotImplementedError("<STR_LIT>")<EOL>"
"Execute this app (abstract method in this class)."
"f104:c7:m4"
"def add_argument(self, *args, **kwargs):"
"if not (('<STR_LIT:action>' in kwargs) and (kwargs['<STR_LIT:action>'] == '<STR_LIT>')):<EOL><INDENT>try:<EOL><INDENT>name = kwargs['<STR_LIT>']<EOL>param_type = kwargs['<STR_LIT:type>']<EOL>optional = kwargs['<STR_LIT>']<EOL><DEDENT>except KeyError as e:<EOL><INDENT>detail = "<STR_LIT>" % e<EOL>raise KeyError(detail)<EOL><DEDENT>if optional and ('<STR_LIT:default>' not in kwargs):<EOL><INDENT>detail = "<STR_LIT>" % name<EOL>raise KeyError(detail)<EOL><DEDENT>default = None<EOL>if '<STR_LIT:default>' in kwargs:<EOL><INDENT>default = kwargs['<STR_LIT:default>']<EOL><DEDENT>param_help = "<STR_LIT>"<EOL>if '<STR_LIT>' in kwargs:<EOL><INDENT>param_help = kwargs['<STR_LIT>']<EOL><DEDENT>if param_type not in (str, int, float, bool, ChrisApp.path):<EOL><INDENT>detail = "<STR_LIT>" % param_type<EOL>raise ValueError(detail)<EOL><DEDENT>action = '<STR_LIT:store>'<EOL>if param_type == bool:<EOL><INDENT>action = '<STR_LIT>' if default else '<STR_LIT:store_true>'<EOL>del kwargs['<STR_LIT:default>'] <EOL>del kwargs['<STR_LIT:type>']<EOL><DEDENT>kwargs['<STR_LIT:action>'] = action<EOL>param = {'<STR_LIT:name>': name, '<STR_LIT:type>': param_type.__name__, '<STR_LIT>': optional,<EOL>'<STR_LIT>': args[<NUM_LIT:0>], '<STR_LIT:action>': action, '<STR_LIT>': param_help, '<STR_LIT:default>': default}<EOL>self._parameters.append(param)<EOL>del kwargs['<STR_LIT>']<EOL><DEDENT>ArgumentParser.add_argument(self, *args, **kwargs)<EOL>"
"Add a parameter to this app."
"f104:c7:m5"
"def get_json_representation(self):"
"repres = {}<EOL>repres['<STR_LIT:type>'] = self.TYPE<EOL>repres['<STR_LIT>'] = self._parameters<EOL>repres['<STR_LIT>'] = self.ICON<EOL>repres['<STR_LIT>'] = self.AUTHORS<EOL>repres['<STR_LIT:title>'] = self.TITLE<EOL>repres['<STR_LIT>'] = self.CATEGORY<EOL>repres['<STR_LIT:description>'] = self.DESCRIPTION<EOL>repres['<STR_LIT>'] = self.DOCUMENTATION<EOL>repres['<STR_LIT>'] = self.LICENSE<EOL>repres['<STR_LIT:version>'] = self.VERSION<EOL>repres['<STR_LIT>'] = self.SELFPATH<EOL>repres['<STR_LIT>'] = self.SELFEXEC<EOL>repres['<STR_LIT>'] = self.EXECSHELL<EOL>repres['<STR_LIT>'] = self.MAX_NUMBER_OF_WORKERS<EOL>repres['<STR_LIT>'] = self.MIN_NUMBER_OF_WORKERS<EOL>repres['<STR_LIT>'] = self.MAX_MEMORY_LIMIT<EOL>repres['<STR_LIT>'] = self.MAX_CPU_LIMIT <EOL>repres['<STR_LIT>'] = self.MIN_MEMORY_LIMIT<EOL>repres['<STR_LIT>'] = self.MIN_CPU_LIMIT <EOL>repres['<STR_LIT>'] = self.MIN_GPU_LIMIT<EOL>repres['<STR_LIT>'] = self.MAX_GPU_LIMIT<EOL>return repres<EOL>"
"Return a JSON object with a representation of this app (type and parameters)."
"f104:c7:m6"
"def save_json_representation(self, dir_path):"
"file_name = self.__class__.__name__+ '<STR_LIT>'<EOL>file_path = os.path.join(dir_path, file_name)<EOL>with open(file_path, '<STR_LIT:w>') as outfile:<EOL><INDENT>json.dump(self.get_json_representation(), outfile)<EOL><DEDENT>"
"Save the app's JSON representation object to a JSON file."
"f104:c7:m7"
"def launch(self, args=None):"
"self.options = self.parse_args(args)<EOL>if self.options.saveinputmeta:<EOL><INDENT>self.save_input_meta()<EOL><DEDENT>if self.options.inputmeta:<EOL><INDENT>self.options = self.get_options_from_file(self.options.inputmeta)<EOL><DEDENT>self.run(self.options)<EOL>if self.options.saveoutputmeta:<EOL><INDENT>self.save_output_meta()<EOL><DEDENT>"
"This method triggers the parsing of arguments."
"f104:c7:m8"
"def get_options_from_file(self, file_path):"
"<EOL>with open(file_path) as options_file:<EOL><INDENT>options_dict = json.load(options_file)<EOL><DEDENT>options = []<EOL>for opt_name in options_dict:<EOL><INDENT>options.append(opt_name)<EOL>options.append(options_dict[opt_name])<EOL><DEDENT>return self.parse_args(options)<EOL>"
"Return the options parsed from a JSON file."
"f104:c7:m9"
"def save_input_meta(self):"
"options = self.options<EOL>file_path = os.path.join(options.outputdir, '<STR_LIT>')<EOL>with open(file_path, '<STR_LIT:w>') as outfile:<EOL><INDENT>json.dump(vars(options), outfile)<EOL><DEDENT>"
"Save the input meta data (options passed to the app) to a JSON file."
"f104:c7:m10"
"def save_output_meta(self):"
"options = self.options<EOL>file_path = os.path.join(options.outputdir, '<STR_LIT>')<EOL>with open(file_path, '<STR_LIT:w>') as outfile:<EOL><INDENT>json.dump(self.OUTPUT_META_DICT, outfile)<EOL><DEDENT>"
"Save descriptive output meta data to a JSON file."
"f104:c7:m11"
"def load_output_meta(self):"
"options = self.options<EOL>file_path = os.path.join(options.inputdir, '<STR_LIT>')<EOL>with open(file_path) as infile:<EOL><INDENT>return json.load(infile)<EOL><DEDENT>"
"Load descriptive output meta data from a JSON file in the input directory."
"f104:c7:m12"
"def get_version(self):"
"return self.VERSION<EOL>"
"Return the app's version."
"f104:c7:m13"
"def print_app_meta_data(self):"
"l_metaData = dir(self)<EOL>l_classVar = [x for x in l_metaData if x.isupper() ]<EOL>for str_var in l_classVar:<EOL><INDENT>str_val = getattr(self, str_var)<EOL>print("<STR_LIT>" % (str_var, str_val))<EOL><DEDENT>"
"Print the app's meta data."
"f104:c7:m14"
"def error(self, message):"
"print()<EOL>sys.stderr.write('<STR_LIT>' % message)<EOL>print()<EOL>self.print_help()<EOL>sys.exit(<NUM_LIT:2>)<EOL>"
"The error handler if wrong commandline arguments are specified."
"f104:c7:m15"
"def find_tarball(directory, name, version):"
"dir_contents = os.listdir(os.path.join(directory, '<STR_LIT>'))<EOL>candidates = [tarball for tarball in dir_contents<EOL>if tarball.endswith('<STR_LIT>')<EOL>and tarball.startswith(name + '<STR_LIT:->' + version)]<EOL>if not candidates:<EOL><INDENT>logger.error("<STR_LIT>",<EOL>name, version)<EOL>logger.error("<STR_LIT>", directory, dir_contents)<EOL>return<EOL><DEDENT>if len(candidates) > <NUM_LIT:1>:<EOL><INDENT>logger.warn("<STR_LIT>",<EOL>candidates)<EOL><DEDENT>tarball = candidates[<NUM_LIT:0>]<EOL>return os.path.join(directory, '<STR_LIT>', tarball)<EOL>"
"Return matching tarball filename from dist/ dir (if found). Setuptools generates a source distribution in a ``dist/`` directory and we need to find the exact filename, whether .tgz or .zip. We expect "name + '-' + version + '.tar.gz'", but we *can* get a -dev.r1234.tar.gz as that can be configured in a setup.cfg. Not pretty, but we don't want to force anyone to modify old tags."
"f108:m0"
"def checkout_dirs(self):"
"directories = [os.path.join(self.base_directory, d)<EOL>for d in os.listdir(self.base_directory)]<EOL>return [d for d in directories if os.path.isdir(d)]<EOL>"
"Return directories inside the base directory."
"f108:c0:m1"
"def missing_tags(self, existing_sdists=None):"
"if existing_sdists is None:<EOL><INDENT>existing_sdists = []<EOL><DEDENT>logger.debug("<STR_LIT>", existing_sdists)<EOL>if self._missing_tags is None:<EOL><INDENT>missing = []<EOL>existing_sdists = sorted_versions(set(existing_sdists))<EOL>available = set(self.wrapper.vcs.available_tags())<EOL>available_tags = sorted_versions(available)<EOL>available_tags.reverse()<EOL>for tag in available_tags:<EOL><INDENT>if tag.is_prerelease:<EOL><INDENT>logger.warn("<STR_LIT>", tag)<EOL>continue<EOL><DEDENT>if tag in existing_sdists:<EOL><INDENT>logger.debug(<EOL>"<STR_LIT>",<EOL>tag)<EOL>break<EOL><DEDENT>else:<EOL><INDENT>missing.append(tag)<EOL>logger.debug("<STR_LIT>", tag)<EOL><DEDENT><DEDENT>missing.reverse()<EOL>mapping = {}<EOL>for tag in available:<EOL><INDENT>mapping[parse_version(tag)] = tag<EOL><DEDENT>self._missing_tags = [mapping[tag] for tag in missing]<EOL><DEDENT>logger.debug("<STR_LIT>", self._missing_tags)<EOL>return self._missing_tags<EOL>"
"Return difference between existing sdists and available tags."
"f108:c1:m1"
"def create_sdist(self, tag):"
"logger.info("<STR_LIT>",<EOL>self.package, tag)<EOL>self.wrapper.vcs.checkout_from_tag(tag)<EOL>self.temp_tagdir = os.path.realpath(os.getcwd())<EOL>logger.debug("<STR_LIT>", self.temp_tagdir)<EOL>python = sys.executable<EOL>logger.debug(command("<STR_LIT>" % python))<EOL>tarball = find_tarball(self.temp_tagdir, self.package, tag)<EOL>return tarball<EOL>"
"Create an sdist and return the full file path of the .tar.gz."
"f108:c1:m2"
"def cleanup(self):"
"shutil.rmtree(self.temp_tagdir)<EOL>parentdir = os.path.dirname(self.temp_tagdir)<EOL>if os.path.basename(parentdir).startswith(self.package):<EOL><INDENT>os.rmdir(parentdir)<EOL><DEDENT>os.chdir(self.start_directory)<EOL>"
"Clean up temporary tag checkout dir."
"f108:c1:m3"
"def main():"
"usage = ("<STR_LIT>"<EOL>"<STR_LIT>"<EOL>"<STR_LIT>")<EOL>parser = optparse.OptionParser(usage=usage)<EOL>parser.add_option("<STR_LIT>", "<STR_LIT>",<EOL>action="<STR_LIT:store_true>", dest="<STR_LIT>", default=False,<EOL>help="<STR_LIT>")<EOL>parser.add_option("<STR_LIT>", "<STR_LIT>",<EOL>action="<STR_LIT:store_true>", dest="<STR_LIT>", default=False,<EOL>help="<STR_LIT>")<EOL>(options, args) = parser.parse_args()<EOL>if len(args) != <NUM_LIT:2>:<EOL><INDENT>parser.print_help()<EOL>return <NUM_LIT:1><EOL><DEDENT>checkouts_dir = args[<NUM_LIT:0>]<EOL>sdists_dir = args[<NUM_LIT:1>]<EOL>checkouts_dir = os.path.abspath(checkouts_dir)<EOL>sdists_dir = os.path.abspath(sdists_dir)<EOL>if options.verbose:<EOL><INDENT>log_level = logging.DEBUG<EOL><DEDENT>elif options.quiet:<EOL><INDENT>log_level = logging.WARN<EOL><DEDENT>else:<EOL><INDENT>log_level = logging.INFO<EOL><DEDENT>logging.basicConfig(level=log_level,<EOL>format="<STR_LIT>")<EOL>logger.info("<STR_LIT>",<EOL>checkouts_dir, sdists_dir)<EOL>package_dir = packagedir.PackageDir(sdists_dir)<EOL>package_dir.parse()<EOL>checkout_base_dir = checkoutdir.CheckoutBaseDir(checkouts_dir)<EOL>for directory in checkout_base_dir.checkout_dirs():<EOL><INDENT>logger.debug("<STR_LIT>", directory)<EOL>checkout_dir = checkoutdir.CheckoutDir(directory)<EOL>package = checkout_dir.package<EOL>if '<STR_LIT>' in package:<EOL><INDENT>continue<EOL><DEDENT>for tag in checkout_dir.missing_tags(<EOL>existing_sdists=package_dir.packages[package]):<EOL><INDENT>tarball = checkout_dir.create_sdist(tag)<EOL>package_dir.add_tarball(tarball, package)<EOL>checkout_dir.cleanup()<EOL><DEDENT><DEDENT>"
"bin/tags2sdists: create an sdist for a directory of checkouts."
"f109:m0"
"def __init__(self, root_directory):"
"self.root_directory = root_directory<EOL>self.packages = collections.defaultdict(list)<EOL>"
"Initialize with the root of the packages dir."
"f111:c0:m0"
"def parse(self):"
"for package in os.listdir(self.root_directory):<EOL><INDENT>directory = os.path.join(self.root_directory, package)<EOL>if not os.path.isdir(directory):<EOL><INDENT>continue<EOL><DEDENT>dir_contents = os.listdir(directory)<EOL>sdists = [tarball for tarball in dir_contents<EOL>if (tarball.endswith('<STR_LIT>')<EOL>and tarball.startswith(package + '<STR_LIT:->'))]<EOL>for sdist in sdists:<EOL><INDENT>version = sdist.replace('<STR_LIT>', '<STR_LIT>').replace(<EOL>package + '<STR_LIT:->', '<STR_LIT>')<EOL>self.packages[package].append(version)<EOL><DEDENT><DEDENT>"
"Iterate through the directory and extract package/version info."
"f111:c0:m1"
"def add_tarball(self, tarball, package):"
"if tarball is None:<EOL><INDENT>logger.error(<EOL>"<STR_LIT>",<EOL>package)<EOL>return<EOL><DEDENT>target_dir = os.path.join(self.root_directory, package)<EOL>if not os.path.exists(target_dir):<EOL><INDENT>os.mkdir(target_dir)<EOL>logger.info("<STR_LIT>", target_dir)<EOL><DEDENT>logger.info("<STR_LIT>", target_dir)<EOL>shutil.copy(tarball, target_dir)<EOL>"
"Add a tarball, possibly creating the directory if needed."
"f111:c0:m2"
"def command(cmd):"
"status, out = commands.getstatusoutput(cmd)<EOL>if status is not <NUM_LIT:0>:<EOL><INDENT>logger.error("<STR_LIT>")<EOL>logger.error(out)<EOL>raise SdistCreationError()<EOL><DEDENT>return out<EOL>"
"Execute command and raise an exception upon an error. >>> 'README' in command('ls') True >>> command('nonexistingcommand') #doctest: +ELLIPSIS Traceback (most recent call last): ... SdistCreationError"
"f112:m0"
"def print_importers():"
"import sys<EOL>import pprint<EOL>print('<STR_LIT>'),<EOL>pprint.pprint(sys.path)<EOL>print()<EOL>print('<STR_LIT>')<EOL>for name, cache_value in sys.path_importer_cache.items():<EOL><INDENT>name = name.replace(sys.prefix, '<STR_LIT>')<EOL>print('<STR_LIT>' % (name, cache_value))<EOL><DEDENT>"
"Helper function to print sys.path and importers cache"
"f122:m0"
"def _resolve_name(name, package, level):"
"bits = package.rsplit('<STR_LIT:.>', level - <NUM_LIT:1>)<EOL>if len(bits) < level:<EOL><INDENT>raise ValueError('<STR_LIT>')<EOL><DEDENT>base = bits[<NUM_LIT:0>]<EOL>return '<STR_LIT>'.format(base, name) if name else base<EOL>"
"Resolve a relative module name to an absolute one."
"f123:m0"
"def resolve_name(name, package):"
"if not name.startswith('<STR_LIT:.>'):<EOL><INDENT>return name<EOL><DEDENT>elif not package:<EOL><INDENT>raise ValueError('<STR_LIT>'<EOL>'<STR_LIT>'.format(name))<EOL><DEDENT>level = <NUM_LIT:0><EOL>for character in name:<EOL><INDENT>if character != '<STR_LIT:.>':<EOL><INDENT>break<EOL><DEDENT>level += <NUM_LIT:1><EOL><DEDENT>return _resolve_name(name[level:], package, level)<EOL>"
"Resolve a relative module name to an absolute one."
"f123:m1"
"def _find_spec_from_path(name, path=None):"
"if name not in sys.modules:<EOL><INDENT>return _find_spec(name, path)<EOL><DEDENT>else:<EOL><INDENT>module = sys.modules[name]<EOL>if module is None:<EOL><INDENT>return None<EOL><DEDENT>try:<EOL><INDENT>spec = module.__spec__<EOL><DEDENT>except AttributeError:<EOL><INDENT>six.raise_from(ValueError('<STR_LIT>'.format(name)), None)<EOL><DEDENT>else:<EOL><INDENT>if spec is None:<EOL><INDENT>raise ValueError('<STR_LIT>'.format(name))<EOL><DEDENT>return spec<EOL><DEDENT><DEDENT>"
"Return the spec for the specified module. First, sys.modules is checked to see if the module was already imported. If so, then sys.modules[name].__spec__ is returned. If that happens to be set to None, then ValueError is raised. If the module is not in sys.modules, then sys.meta_path is searched for a suitable spec with the value of 'path' given to the finders. None is returned if no spec could be found. Dotted names do not have their parent packages implicitly imported. You will most likely need to explicitly import all parent packages in the proper order for a submodule to get the correct spec."
"f123:m2"
"def find_spec(name, package=None):"
"fullname = resolve_name(name, package) if name.startswith('<STR_LIT:.>') else name<EOL>if fullname not in sys.modules:<EOL><INDENT>parent_name = fullname.rpartition('<STR_LIT:.>')[<NUM_LIT:0>]<EOL>if parent_name:<EOL><INDENT>parent = __import__(parent_name, fromlist=['<STR_LIT>'])<EOL>return _find_spec(fullname, parent.__path__)<EOL><DEDENT>else:<EOL><INDENT>return _find_spec(fullname, None)<EOL><DEDENT><DEDENT>else:<EOL><INDENT>module = sys.modules[fullname]<EOL>if module is None:<EOL><INDENT>return None<EOL><DEDENT>try:<EOL><INDENT>spec = module.__spec__<EOL><DEDENT>except AttributeError:<EOL><INDENT>six.raise_from(ValueError('<STR_LIT>'.format(name)), None)<EOL><DEDENT>else:<EOL><INDENT>if spec is None:<EOL><INDENT>raise ValueError('<STR_LIT>'.format(name))<EOL><DEDENT>return spec<EOL><DEDENT><DEDENT>"
"Return the spec for the specified module. First, sys.modules is checked to see if the module was already imported. If so, then sys.modules[name].__spec__ is returned. If that happens to be set to None, then ValueError is raised. If the module is not in sys.modules, then sys.meta_path is searched for a suitable spec with the value of 'path' given to the finders. None is returned if no spec could be found. If the name is for submodule (contains a dot), the parent module is automatically imported. The name and package arguments work the same as importlib.import_module(). In other words, relative module names (with leading dots) work."
"f123:m3"
"def set_package(fxn):"
"@functools.wraps(fxn)<EOL>def set_package_wrapper(*args, **kwargs):<EOL><INDENT>warnings.warn('<STR_LIT>',<EOL>DeprecationWarning, stacklevel=<NUM_LIT:2>)<EOL>module = fxn(*args, **kwargs)<EOL>if getattr(module, '<STR_LIT>', None) is None:<EOL><INDENT>module.__package__ = module.__name__<EOL>if not hasattr(module, '<STR_LIT>'):<EOL><INDENT>module.__package__ = module.__package__.rpartition('<STR_LIT:.>')[<NUM_LIT:0>]<EOL><DEDENT><DEDENT>return module<EOL><DEDENT>return set_package_wrapper<EOL>"
"Set __package__ on the returned module. This function is deprecated."
"f123:m4"
"def set_loader(fxn):"
"@functools.wraps(fxn)<EOL>def set_loader_wrapper(self, *args, **kwargs):<EOL><INDENT>warnings.warn('<STR_LIT>',<EOL>DeprecationWarning, stacklevel=<NUM_LIT:2>)<EOL>module = fxn(self, *args, **kwargs)<EOL>if getattr(module, '<STR_LIT>', None) is None:<EOL><INDENT>module.__loader__ = self<EOL><DEDENT>return module<EOL><DEDENT>return set_loader_wrapper<EOL>"
"Set __loader__ on the returned module. This function is deprecated."
"f123:m5"
"def module_for_loader(fxn):"
"warnings.warn('<STR_LIT>',<EOL>DeprecationWarning, stacklevel=<NUM_LIT:2>)<EOL>@functools.wraps(fxn)<EOL>def module_for_loader_wrapper(self, fullname, *args, **kwargs):<EOL><INDENT>with _module_to_load(fullname) as module:<EOL><INDENT>module.__loader__ = self<EOL>try:<EOL><INDENT>is_package = self.is_package(fullname)<EOL><DEDENT>except (ImportError, AttributeError):<EOL><INDENT>pass<EOL><DEDENT>else:<EOL><INDENT>if is_package:<EOL><INDENT>module.__package__ = fullname<EOL><DEDENT>else:<EOL><INDENT>module.__package__ = fullname.rpartition('<STR_LIT:.>')[<NUM_LIT:0>]<EOL><DEDENT><DEDENT>return fxn(self, module, *args, **kwargs)<EOL><DEDENT><DEDENT>return module_for_loader_wrapper<EOL>"
"Decorator to handle selecting the proper module for loaders. The decorated function is passed the module to use instead of the module name. The module passed in to the function is either from sys.modules if it already exists or is a new module. If the module is new, then __name__ is set the first argument to the method, __loader__ is set to self, and __package__ is set accordingly (if self.is_package() is defined) will be set before it is passed to the decorated function (if self.is_package() does not work for the module it will be set post-load). If an exception is raised and the decorator created the module it is subsequently removed from sys.modules. The decorator assumes that the decorated function takes the module name as the second argument."
"f123:m7"
"def all_suffixes():"
"return SOURCE_SUFFIXES + BYTECODE_SUFFIXES + EXTENSION_SUFFIXES<EOL>"
"Returns a list of all recognized module suffixes for this process"
"f124:m1"
End of preview (truncated to 100 rows)

No dataset card yet

New: Create and edit this dataset card directly on the website!

Contribute a Dataset Card
Downloads last month
0
Add dataset card
Evaluate models HF Leaderboard

Models trained or fine-tuned on microsoft/codexglue_method_generation