User:Metamorforme42/QuickStatements

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

''' This software provide some tools to exploit csv
from https://query.wikidata.org with
QuickStatements2 <https://tools.wmflabs.org/quickstatements/>.

Make sure your csv file has been utf-8 encoded.
'''

__author__ = '[[User:Metamorforme42]]'
__credits__ = '2017 – Metamorforme42, CC0-1.0, use at your own risks'
__date__ = '2017-09-28'

import csv as _csv
from functools import wraps as _wraps
from inspect import signature as _signature

def input_file_name(default='query', extension='csv', read_only=True) -> str:
    '''Ask the user for a file name.'''
    my_file = input('Enter the name of your file (default {d}.{ext}): '.format(
    d=default, ext=extension))
    if not my_file:
        my_file = '{d}.{ext}'.format(d=default, ext=extension)

    # Some tests about file
    if my_file.find('.') != -1:
        if not my_file.endswith('.{}'.format(extension)):
            print('WARNING: file extension is not `.{}`!'.format(extension))
    elif read_only:
        print('WARNING: file name given has no extension!')
        try:
            with open(my_file, 'r'):
                pass # we test if the file is openable
        except FileNotFoundError:
            my_file = '{name}.{ext}'.format(name=my_file, ext=extension)
            print('WARNING: assume correct file name is `{}`.'.format(my_file))
    else:
        print('WARNING file name given has no extension!')
        try_file = '{name}.{ext}'.format(name=my_file, ext=extension)
        try:
            with open(my_file, 'r'):
                pass # we test if the file is openable
        except FileNotFoundError:
            pass
        else:
            print('ERROR: no extension and file with extension already exists!')
            print('Sorry, but I don’t want you to lose your data.')
            print('Please re-run and give a correct name for the file.')
            raise Exception
    return my_file

class CsvDictList(list):
    '''List of dicts.
Each dict must be a row in the csv.
'''
    @property
    def prefix(self):
        '''Permanent prefix used on Qid in Query tool’s csv.
You can also overpass this value in methods with the parameter `prefix`.
'''
        return self._prefix

    @prefix.setter
    def prefix(self, value):
        '''Please make sure prefix is a string before setting.'''
        if isinstance(value, str):
            self._prefix = value
        else:
            print('ERROR: prefix must be a string.')
            raise Exception

    def __init__(self):
        list.__init__(self)
        self._prefix = 'http://www.wikidata.org/entity/'


    def Qid(self, prefixed_Qid, prefix=None) -> str:
        '''Wikidata Query items links have prefix, this function return
a non prefixed string for the entity.
'''
        if prefix:
            ret = prefixed_Qid[len(prefix):]
        else:
            ret = prefixed_Qid[len(self.prefix):]
        return ret

    def clean_ids(self, key_list=None, prefix=None,
                    clean:'defaultly True'=None):
        '''Clean CSV’s Qids.
You can provide a list of key used to clean;
otherwise they will be detected automatically.
If you don't want any clean, set clean to False
'''
        if clean is None:
            clean = True
        if clean:
            if key_list is None:
                key_list = self.detect_col_Qid()
            for n, row in enumerate(self):
                for key in key_list:
                    self[n][key] = self.Qid(row[key], prefix=prefix)

    def detect_col_Qid(self) -> tuple:
        '''Detect col containing entities items based on first row heders.'''
        ret = []
        for key in self[0].keys():
            if ((not key.endswith('Label')) and (not key.endswith('Alias'))
               and (not key.endswith('Command'))):
                ret.append(key)
        return tuple(ret)

def csv_dicts_copy(my_file) -> CsvDictList:
    '''Copy value from file ito a CsvDictList.'''
    ret = CsvDictList()
    c = _csv.DictReader(my_file)
    for row in c:
        d = {}
        for fieldname in c.fieldnames:
            d[fieldname] = row[fieldname]
        ret.append(d)
    return ret

class QuickStatement_command:
    '''Use this class to create statements on an item.'''

    @property
    def Q(self):
        '''The command will be applied on this item.'''
        return self._Q

    @property
    def content(self):
        '''This is the text you have to copy in QuickStatement.'''
        return self._content

    @content.setter
    def content(self, value):
        '''We defaulty add statement and never delete old statement
on the instance of QuickStatement_command.
'''
        self._content = ''.join([self._content, value])

    def __init__(self, Q=None, create=None):
        if create:
            self._Q = 'LAST'
            self._content = '||CREATE'
        elif Q:
            self._Q = Q #format Q123 on Wikidata
            self._content = '' # init content
        else:
            print('ERROR: no id passed and no creation requested!')
            raise Exception

    def _add_content(self, pattern, P, Q):
        '''Add a command part to content.'''
        self.content = pattern.format(item=self.Q, prop=P, sttmt=Q)

    def del_statement(self, P, Q):
        '''Delete a statement.'''
        pattern =  '||-{item}|{prop}|{sttmt}'
        self._add_content(pattern=pattern, P=P, Q=Q)

    def add_statement(self, P, Q):
        '''Add a statement.'''
        pattern = '||{item}|{prop}|{sttmt}'
        self._add_content(pattern=pattern, P=P, Q=Q)

    def add_qualifier(self, P, Q):
        '''Add a qualifier to the last statement added with `add_statement()`.
Can be a source if P begins with `S` instead of `P`.
'''
        pattern = '|{prop}|{sttmt}'
        self._add_content(pattern=pattern, P=P, Q=Q)

    def label (self, label, lang='en'):
        '''Add or replace the label of this item on a given language.'''
        pattern = '||{item}|L{prop}|"{sttmt}"'
        self._add_content(pattern=pattern, P=lang, Q=label)

    def alias (self, alias, lang='en'):
        '''Set an alias for this item on a given language.'''
        pattern = '||{item}|A{prop}|"{sttmt}"'
        self._add_content(pattern=pattern, P=lang, Q=alias)

    def describe (self, description, lang='en'):
        '''Set a description for this item on a given language.'''
        pattern = '||{item}|D{prop}|"{sttmt}"'
        self._add_content(pattern=pattern, P=lang, Q=description)

class ToolFunction:
    '''This class is used to know what tools exist.'''
    _reg = [] # this is a shared list for all instances of the class

    @property
    def reg(self):
        '''List of instanciated tools.'''
        return ToolFunction._reg

    def __call__(self, func):
        '''This method is called when we decorate a new tool function.'''
        ToolFunction._reg.append(func)
        return func

def _wraps_sig(params=None):
    '''This function should be used as decorator.
params should contain names of parameters to clear from the signature of
the function `func` passed.
'''
    def decorator(func):
        '''To use decorators with arguments, _wraps_sig is called with params;
then it returns decorator and then decorator is called with func.
params are in _wraps scope => decorator can access when called.
After having defined wrapper, we change it’s signature.
'''
        if params is None:
            wrapper = func
        else:
            @_wraps(func)
            def wrapper(*args, **kwargs):
                '''In this wrapper, we delete some args (because we don't
wan’t having 2 value for 1 arg. Then we execute the func.
'''
                # init messages
                warn_msg = 'WARNING: {fname} tool should not be called with `{para}`!'
                # tests to avoid errors
                for name in  params:
                    try:
                        kwargs.pop(name)
                    except KeyError:
                        pass
                return func(*args, **kwargs)
            # remove parameters from wrapper signature
            sig = _signature(func)
            tab = sig.parameters.items()
            numdelete = 0
            for i, j in enumerate(tab):
                if j[0] in params:
                    param = tuple(sig.parameters.values())[:i-numdelete]
                    param += tuple(sig.parameters.values())[i+1-numdelete:]
                    sig = sig.replace(parameters=param)
                    numdelete+=1
            wrapper.__signature__ = sig
        return wrapper
    return decorator


def tool(func=None, interface_enter=None, interface_exit=None,
                    cleanning_settings=None, init_tool=None) -> callable:
    '''This is a wrapper function.
Use it as a decorator for tools function.
A tool function must return a string; this string will be
interpreted as QS command.
'''
    # init init_tool
    if init_tool is None:
        def init_tool():
            '''This function may be overwritted with some menu.
Return parameters (dict) to be used by the tool.
'''
            return None

    # init interface_enter
    if interface_enter is None:
        def interface_enter() -> (str, dict):
            '''This function makes an interface before using the tool.
It can be overwritted but must return a string, and
some value for interface_exit (i.e. name of the file read, a start time…).
'''
            print('==== input file ===='.capitalize())
            filename = input_file_name(default='query', extension='csv')
            return filename, None # should be a {}

    # init interface_exit
    if interface_exit is None:
        def interface_exit(command_list, interf_data):
            '''This function makes an interface after using the tool.
You can overwrite it.
'''
            print('==== output file ===='.capitalize())
            output_file = input_file_name(default='qs', extension='txt', read_only=False)
            # write the request on a file you can copy and paste it on https://tools.wmflabs.org/quickstatements
            with open(output_file, "w") as fichier:
                for command in command_list :
                    fichier.write(command)
            print('You can go on `https://tools.wmflabs.org/quickstatements` to run the command.')

    def decorator (func):
        '''This function is called after `tool()` have been called.
Automatically if you have passed some parameters to `tool()`, or right after
definition otherwise.
'''
        # init class decorator
        tool_function = ToolFunction()
        # definition of wrapper
        @tool_function
        @_wraps_sig(params=['line', 'commands'])
        @_wraps(func)
        def wrapper(*args, **kwargs):
            # call interface_enter
            commands = init_tool()
            my_file, interfaces_data = interface_enter()
            # need some values for clean ids
            # cleanning_settings is passed to the decorator, and can be passed
            # at function calling time
            key_clean_ids = 'cleanning_settings'
            if kwargs and key_clean_ids in kwargs:
                clset = kwargs[key_clean_ids]
            elif cleanning_settings is None:
                clset = {}
            else:
                clset = cleanning_settings

            with open (my_file, 'r') as file_in:
                my_csv = csv_dicts_copy(file_in)
                my_csv.clean_ids(**clset)
                command_list = []
                for line in my_csv:
                    command_list.append(func(line=line, commands=commands,
                                                *args, **kwargs))
            interface_exit(command_list, interfaces_data)
            return None
        return wrapper
    if func is not None:
        # @tool without parameters
        decorator = decorator(func)
    return decorator

@tool(cleanning_settings={'clean': False})
def create_ambassadors_from_a_category(line, ambassador_class,
                                                commands=None) -> str:
    '''This tool creates items for ambassadors (position not Q5) from a
category (specific class of ambassador) containing categories associated to
created items.
'''
    # we don't want to clean any id
    # because here, data is from petscan
    my_command = QuickStatement_command(create=True)
    try:
        if line['enLabel']:
            my_command.label(lang='en', label=line['enLabel'])
        if line['frLabel']:
            my_command.label(lang='fr', label=line['frLabel'])
    except KeyError:
        pass    # enLabel or frLabel key don't exists,
                # but this is generally not important
    my_command.add_statement('P910', line['Category'])
    my_command.add_statement('P31', ambassador_class)
    return my_command.content

@tool
def create_ambassadors_class (line, commands= None, defaultmode=None) -> str:
    '''This tool creates items for classes of ambassadors.
Fill your csv with:
country,category,enLabel,frLabel,enAlias,frAlias,modeCommand
modeCommand should be either `to` or `of`.
'''
    my_command = QuickStatement_command(create=True)
    try:
        if line['enLabel']:
            my_command.label(line['enLabel'], lang='en')
        if line['frLabel']:
            my_command.label(line['frLabel'], lang='fr')
        if line['enAlias']:
            my_command.alias(line['enAlias'], lang='en')
        if line['frAlias']:
            my_command.alias(line['frAlias'], lang='fr')
    except KeyError:
        pass    # enLabel or frLabel key don't exists,
                # but this is generally not important
    try:
        if not line['modeCommand'] and defaultmode:
            line['modeCommand'] = defaultmode # fallback
    except KeyError:
        if defaultmode:
            print('WARNING: no `modeCommand` key and no defaultmode!')
            print('Please check you haven’t forgot it.')
            line['modeCommand'] = defaultmode
        else:
            raise KeyError
    if line['modeCommand'] == 'to':
        my_command.add_statement('P31', 'Q29918335')
        my_command.add_statement('P279','Q29645886')
    elif line['modeCommand'] == 'of':
        my_command.add_statement('P31', 'Q29918328')
        my_command.add_statement('P279','Q29645880')
    else:
        print('ERROR: `mode` parameter must be `of` or `to`!')
        raise Exception

    if line['country']:
        my_command.add_statement('P17', line['country'])
    else:
        print('WARNING: missing country on a line!')
        print('Country is usefull for avoid duplicated items.')
    if line['category']:
        my_command.add_statement('P910', line['category'])
    else:
        print('WARNING: missing category on a line!')
        print('If the category exists, please consider adding this one to the item.')
    return my_command.content

@tool
def add_subject_to_categories (line, commands=None) -> str:
    '''Automatically add subject to categories (when inverse property is set).
You need to do a query with query.wikidata.org before.
See [[User:Metamorforme42/Ambassadors]] to get the query.
'''
    my_command = QuickStatement_command(Q=line['cat'])
    my_command.add_statement('P301', line['item'])
    return my_command.content

@tool
def add_P31_categories (line, commands=None) -> str:
    '''This tool add `instance of category` on all listed items.'''
    my_command = QuickStatement_command(Q=line['item'])
    my_command.add_statement('P31', 'Q4167836')
    return my_command.content

@tool
def change_person_to_Q5(line, commands=None) -> str:
    '''This tool change `is a list of:person` to `is a list of:human` on
all listed items.
'''
    mc=QuickStatement_command(Q=line['sub'])
    mc.del_statement('P360', 'Q215627')
    mc.add_statement('P360', 'Q5')
    return mc.content

@tool
def cat_and_list(line, commands=None) -> str:
    '''This tool `transform category/subject` to
`list related to category/category related to list` on all listed items.
'''
    mc1=QuickStatement_command(Q=line['category']) # cat
    mc1.del_statement('P301', line['list'])
    mc1.add_statement('P1753', line['list'])
    mc2=QuickStatement_command(Q=line['list']) # list
    mc2.del_statement('P910', line['item'])
    mc2.add_statement('P1754', line['item'])
    return ''.join([mc1.content, mc2.content])

def find_ambassador_linker(key, lang):
    '''This function is used to make better chance keys of ambassadors list and
keys of categories link.
'''
    key = key.lower()
    if lang == 'en':
        key = key.replace(' (country)', '')
        key = key.replace('the ', '')
        key = key.replace('united states of america', 'united states')
        key = key.replace('people\'s republic of china', 'china')
        key = key.replace('vatican city', 'vatican')
        key = key.replace('holy see', 'vatican')
        key = key.replace('apostolic nuncios', 'vatican')
    elif lang == 'fr':
        key = key.replace(' de ', ' ')
        key = key.replace(" d\'", ' ')
        key = key.replace(" l\'", ' ')
        key = key.replace(' du ', ' ')
        key = key.replace(' des ', ' ')
        key = key.replace(' auprès ', ' to ')
        key = key.replace(' à la ', ' to ')
        key = key.replace(' la ', ' ')
        key = key.replace(' au ', ' to ')
        key = key.replace(' aux ', ' to ')
        key = key.replace(' dans ', ' to ')
        key = key.replace(' en ', ' to ')
        key = key.replace(' à ', ' to ')
        key = key.replace(' près le ', ' to ')
        key = key.replace('saint-siège', 'vatican')
        key = key.replace('cité du vatican', 'vatican')
        key = key.replace('république populaire ', '')
        if key.startswith(' '):
            key = key[1:]

    else:
        print('Error: lang `{}` is not a correct code!'.format(lang))
        raise Exception
    return key

def find_ambassador_init_tool():
    '''This is an initialisation for find_ambassador tool.'''
    lang_list = ['en', 'fr']
    help_csv = '''CSV with all instances of ambassadors+country needed
help: [[User:Metamorforme42/Ambassadors]] query
`display instance of ambassadors with countries`.'''
    if len(lang_list) > 1:
        lang = None
        while lang not in lang_list:
            print('Select language ', end='(')
            print(*lang_list, sep='/', end='): ')
            lang = input()
            lang = lang.lower()
            if lang not in lang_list:
                print('Wrong lang code')
    else:
        lang = lang_list
    default_file_name = 'ambassadors_list_{language}'
    default_file_name = default_file_name.format(language=lang)
    print(help_csv)
    n = input_file_name(default=default_file_name, extension ='csv',
    read_only=True)
    with open (n, 'r') as file_in:
        amblist = csv_dicts_copy(file_in)
        amblist.clean_ids()

    d = {}
    for line in amblist:
        key = '{} to {}'.format(line['p1Label'], line['p2Label'])
        key = find_ambassador_linker(key=key, lang=lang)
        d[key] = (line['item2'], line['item2Label'])
    dic = {'amb_list':d, 'lang':lang}
    return dic

@tool(init_tool=find_ambassador_init_tool)
def find_ambassador(line, commands=None) ->str:
    '''This tool allow semi-auto link between categories and ambassadors items.'''
    if commands is None:
        print('Error: ambassadors list is needed!')
        print('Error: language is needed!')
        raise Exception
    else:
        l_keys = ['amb_list', 'lang']
        l_keys_copy = l_keys.copy()
        for i in commands.keys():
            if i in l_keys:
                l_keys_copy.remove(i)
            else:
                l_keys_copy.append(i)
        if l_keys_copy != []:
            print('Error: commands must contain wrong parameters')
            print('parameters given: ', *commands.keys())
            print('parameters needed: ', *l_keys)
            raise Exception

        d_cat_lang =   {'en': 'Category:Ambassadors of ',
                        'fr': 'Catégorie:Ambassadeur'}

        k = line['itemLabel'].replace(d_cat_lang[commands['lang']], '')
        k = find_ambassador_linker(key=k, lang=commands['lang'])
        try:
            validation_values = ['yes', 'no']
            print(line['itemLabel'], '\t', k, '\t', commands['amb_list'][k])
            rep = None
            while (rep not in validation_values):
                print('correct?', end=' ')
                print(*validation_values, sep='/', end=': ')
                rep = input().lower()
                if (rep not in validation_values):
                    print('Bad value ({}): try again.'.format(rep))

            if rep == 'yes':
                c = QuickStatement_command(Q=line['item'])
                c.add_statement('P301', commands['amb_list'][k][0])
                c2 = QuickStatement_command(Q=commands['amb_list'][k][0])
                c2.add_statement('P910', line['item'])
                ret = ''.join([c.content, c2.content])
            else:
                ret = ''

        except KeyError:
            print(line['itemLabel'], '\t', k, '\t', 'NOT FOUND')
            ret = ''
    return ret

def tool_selection():
    '''Display a selection menu.
Not fully implemented, for the moment, it only displays a warning because
you need to manually configure the tool used in the code.
This menu will ask user to set cleanning_settings, and others settings.
'''
    tools = ToolFunction().reg
    print('WARNING: you need to uncomment one line to use the associated tool.')
    print('Available functions:')
    for f in tools:
        print('\t{}()'.format(f.__name__))



if __name__ == '__main__':
    # uncomment a tool in the followings
    tool_selection()
    #find_ambassador()
    #cat_and_list()
    #change_person_to_Q5()
    #add_P31_categories()
    #add_subject_to_categories()
    #create_ambassadors_class(defaultmode='of', cleanning_settings={'clean':False})
    #create_ambassadors_from_a_category(ambassador_class='Qtest')