#!/usr/bin/env python3
# -*- coding: utf-8 -*-
''' This software provide some tools to exploit csv
from https://query.wikidata.org with
QuickStatements2 <https://tools.wmflabs.org/quickstatements/>.
Make sure your csv file has been utf-8 encoded.
'''
__author__ = '[[User:Metamorforme42]]'
__credits__ = '2017 – Metamorforme42, CC0-1.0, use at your own risks'
__date__ = '2017-09-28'
import csv as _csv
from functools import wraps as _wraps
from inspect import signature as _signature
def input_file_name(default='query', extension='csv', read_only=True) -> str:
'''Ask the user for a file name.'''
my_file = input('Enter the name of your file (default {d}.{ext}): '.format(
d=default, ext=extension))
if not my_file:
my_file = '{d}.{ext}'.format(d=default, ext=extension)
# Some tests about file
if my_file.find('.') != -1:
if not my_file.endswith('.{}'.format(extension)):
print('WARNING: file extension is not `.{}`!'.format(extension))
elif read_only:
print('WARNING: file name given has no extension!')
try:
with open(my_file, 'r'):
pass # we test if the file is openable
except FileNotFoundError:
my_file = '{name}.{ext}'.format(name=my_file, ext=extension)
print('WARNING: assume correct file name is `{}`.'.format(my_file))
else:
print('WARNING file name given has no extension!')
try_file = '{name}.{ext}'.format(name=my_file, ext=extension)
try:
with open(my_file, 'r'):
pass # we test if the file is openable
except FileNotFoundError:
pass
else:
print('ERROR: no extension and file with extension already exists!')
print('Sorry, but I don’t want you to lose your data.')
print('Please re-run and give a correct name for the file.')
raise Exception
return my_file
class CsvDictList(list):
'''List of dicts.
Each dict must be a row in the csv.
'''
@property
def prefix(self):
'''Permanent prefix used on Qid in Query tool’s csv.
You can also overpass this value in methods with the parameter `prefix`.
'''
return self._prefix
@prefix.setter
def prefix(self, value):
'''Please make sure prefix is a string before setting.'''
if isinstance(value, str):
self._prefix = value
else:
print('ERROR: prefix must be a string.')
raise Exception
def __init__(self):
list.__init__(self)
self._prefix = 'http://www.wikidata.org/entity/'
def Qid(self, prefixed_Qid, prefix=None) -> str:
'''Wikidata Query items links have prefix, this function return
a non prefixed string for the entity.
'''
if prefix:
ret = prefixed_Qid[len(prefix):]
else:
ret = prefixed_Qid[len(self.prefix):]
return ret
def clean_ids(self, key_list=None, prefix=None,
clean:'defaultly True'=None):
'''Clean CSV’s Qids.
You can provide a list of key used to clean;
otherwise they will be detected automatically.
If you don't want any clean, set clean to False
'''
if clean is None:
clean = True
if clean:
if key_list is None:
key_list = self.detect_col_Qid()
for n, row in enumerate(self):
for key in key_list:
self[n][key] = self.Qid(row[key], prefix=prefix)
def detect_col_Qid(self) -> tuple:
'''Detect col containing entities items based on first row heders.'''
ret = []
for key in self[0].keys():
if ((not key.endswith('Label')) and (not key.endswith('Alias'))
and (not key.endswith('Command'))):
ret.append(key)
return tuple(ret)
def csv_dicts_copy(my_file) -> CsvDictList:
'''Copy value from file ito a CsvDictList.'''
ret = CsvDictList()
c = _csv.DictReader(my_file)
for row in c:
d = {}
for fieldname in c.fieldnames:
d[fieldname] = row[fieldname]
ret.append(d)
return ret
class QuickStatement_command:
'''Use this class to create statements on an item.'''
@property
def Q(self):
'''The command will be applied on this item.'''
return self._Q
@property
def content(self):
'''This is the text you have to copy in QuickStatement.'''
return self._content
@content.setter
def content(self, value):
'''We defaulty add statement and never delete old statement
on the instance of QuickStatement_command.
'''
self._content = ''.join([self._content, value])
def __init__(self, Q=None, create=None):
if create:
self._Q = 'LAST'
self._content = '||CREATE'
elif Q:
self._Q = Q #format Q123 on Wikidata
self._content = '' # init content
else:
print('ERROR: no id passed and no creation requested!')
raise Exception
def _add_content(self, pattern, P, Q):
'''Add a command part to content.'''
self.content = pattern.format(item=self.Q, prop=P, sttmt=Q)
def del_statement(self, P, Q):
'''Delete a statement.'''
pattern = '||-{item}|{prop}|{sttmt}'
self._add_content(pattern=pattern, P=P, Q=Q)
def add_statement(self, P, Q):
'''Add a statement.'''
pattern = '||{item}|{prop}|{sttmt}'
self._add_content(pattern=pattern, P=P, Q=Q)
def add_qualifier(self, P, Q):
'''Add a qualifier to the last statement added with `add_statement()`.
Can be a source if P begins with `S` instead of `P`.
'''
pattern = '|{prop}|{sttmt}'
self._add_content(pattern=pattern, P=P, Q=Q)
def label (self, label, lang='en'):
'''Add or replace the label of this item on a given language.'''
pattern = '||{item}|L{prop}|"{sttmt}"'
self._add_content(pattern=pattern, P=lang, Q=label)
def alias (self, alias, lang='en'):
'''Set an alias for this item on a given language.'''
pattern = '||{item}|A{prop}|"{sttmt}"'
self._add_content(pattern=pattern, P=lang, Q=alias)
def describe (self, description, lang='en'):
'''Set a description for this item on a given language.'''
pattern = '||{item}|D{prop}|"{sttmt}"'
self._add_content(pattern=pattern, P=lang, Q=description)
class ToolFunction:
'''This class is used to know what tools exist.'''
_reg = [] # this is a shared list for all instances of the class
@property
def reg(self):
'''List of instanciated tools.'''
return ToolFunction._reg
def __call__(self, func):
'''This method is called when we decorate a new tool function.'''
ToolFunction._reg.append(func)
return func
def _wraps_sig(params=None):
'''This function should be used as decorator.
params should contain names of parameters to clear from the signature of
the function `func` passed.
'''
def decorator(func):
'''To use decorators with arguments, _wraps_sig is called with params;
then it returns decorator and then decorator is called with func.
params are in _wraps scope => decorator can access when called.
After having defined wrapper, we change it’s signature.
'''
if params is None:
wrapper = func
else:
@_wraps(func)
def wrapper(*args, **kwargs):
'''In this wrapper, we delete some args (because we don't
wan’t having 2 value for 1 arg. Then we execute the func.
'''
# init messages
warn_msg = 'WARNING: {fname} tool should not be called with `{para}`!'
# tests to avoid errors
for name in params:
try:
kwargs.pop(name)
except KeyError:
pass
return func(*args, **kwargs)
# remove parameters from wrapper signature
sig = _signature(func)
tab = sig.parameters.items()
numdelete = 0
for i, j in enumerate(tab):
if j[0] in params:
param = tuple(sig.parameters.values())[:i-numdelete]
param += tuple(sig.parameters.values())[i+1-numdelete:]
sig = sig.replace(parameters=param)
numdelete+=1
wrapper.__signature__ = sig
return wrapper
return decorator
def tool(func=None, interface_enter=None, interface_exit=None,
cleanning_settings=None, init_tool=None) -> callable:
'''This is a wrapper function.
Use it as a decorator for tools function.
A tool function must return a string; this string will be
interpreted as QS command.
'''
# init init_tool
if init_tool is None:
def init_tool():
'''This function may be overwritted with some menu.
Return parameters (dict) to be used by the tool.
'''
return None
# init interface_enter
if interface_enter is None:
def interface_enter() -> (str, dict):
'''This function makes an interface before using the tool.
It can be overwritted but must return a string, and
some value for interface_exit (i.e. name of the file read, a start time…).
'''
print('==== input file ===='.capitalize())
filename = input_file_name(default='query', extension='csv')
return filename, None # should be a {}
# init interface_exit
if interface_exit is None:
def interface_exit(command_list, interf_data):
'''This function makes an interface after using the tool.
You can overwrite it.
'''
print('==== output file ===='.capitalize())
output_file = input_file_name(default='qs', extension='txt', read_only=False)
# write the request on a file you can copy and paste it on https://tools.wmflabs.org/quickstatements
with open(output_file, "w") as fichier:
for command in command_list :
fichier.write(command)
print('You can go on `https://tools.wmflabs.org/quickstatements` to run the command.')
def decorator (func):
'''This function is called after `tool()` have been called.
Automatically if you have passed some parameters to `tool()`, or right after
definition otherwise.
'''
# init class decorator
tool_function = ToolFunction()
# definition of wrapper
@tool_function
@_wraps_sig(params=['line', 'commands'])
@_wraps(func)
def wrapper(*args, **kwargs):
# call interface_enter
commands = init_tool()
my_file, interfaces_data = interface_enter()
# need some values for clean ids
# cleanning_settings is passed to the decorator, and can be passed
# at function calling time
key_clean_ids = 'cleanning_settings'
if kwargs and key_clean_ids in kwargs:
clset = kwargs[key_clean_ids]
elif cleanning_settings is None:
clset = {}
else:
clset = cleanning_settings
with open (my_file, 'r') as file_in:
my_csv = csv_dicts_copy(file_in)
my_csv.clean_ids(**clset)
command_list = []
for line in my_csv:
command_list.append(func(line=line, commands=commands,
*args, **kwargs))
interface_exit(command_list, interfaces_data)
return None
return wrapper
if func is not None:
# @tool without parameters
decorator = decorator(func)
return decorator
@tool(cleanning_settings={'clean': False})
def create_ambassadors_from_a_category(line, ambassador_class,
commands=None) -> str:
'''This tool creates items for ambassadors (position not Q5) from a
category (specific class of ambassador) containing categories associated to
created items.
'''
# we don't want to clean any id
# because here, data is from petscan
my_command = QuickStatement_command(create=True)
try:
if line['enLabel']:
my_command.label(lang='en', label=line['enLabel'])
if line['frLabel']:
my_command.label(lang='fr', label=line['frLabel'])
except KeyError:
pass # enLabel or frLabel key don't exists,
# but this is generally not important
my_command.add_statement('P910', line['Category'])
my_command.add_statement('P31', ambassador_class)
return my_command.content
@tool
def create_ambassadors_class (line, commands= None, defaultmode=None) -> str:
'''This tool creates items for classes of ambassadors.
Fill your csv with:
country,category,enLabel,frLabel,enAlias,frAlias,modeCommand
modeCommand should be either `to` or `of`.
'''
my_command = QuickStatement_command(create=True)
try:
if line['enLabel']:
my_command.label(line['enLabel'], lang='en')
if line['frLabel']:
my_command.label(line['frLabel'], lang='fr')
if line['enAlias']:
my_command.alias(line['enAlias'], lang='en')
if line['frAlias']:
my_command.alias(line['frAlias'], lang='fr')
except KeyError:
pass # enLabel or frLabel key don't exists,
# but this is generally not important
try:
if not line['modeCommand'] and defaultmode:
line['modeCommand'] = defaultmode # fallback
except KeyError:
if defaultmode:
print('WARNING: no `modeCommand` key and no defaultmode!')
print('Please check you haven’t forgot it.')
line['modeCommand'] = defaultmode
else:
raise KeyError
if line['modeCommand'] == 'to':
my_command.add_statement('P31', 'Q29918335')
my_command.add_statement('P279','Q29645886')
elif line['modeCommand'] == 'of':
my_command.add_statement('P31', 'Q29918328')
my_command.add_statement('P279','Q29645880')
else:
print('ERROR: `mode` parameter must be `of` or `to`!')
raise Exception
if line['country']:
my_command.add_statement('P17', line['country'])
else:
print('WARNING: missing country on a line!')
print('Country is usefull for avoid duplicated items.')
if line['category']:
my_command.add_statement('P910', line['category'])
else:
print('WARNING: missing category on a line!')
print('If the category exists, please consider adding this one to the item.')
return my_command.content
@tool
def add_subject_to_categories (line, commands=None) -> str:
'''Automatically add subject to categories (when inverse property is set).
You need to do a query with query.wikidata.org before.
See [[User:Metamorforme42/Ambassadors]] to get the query.
'''
my_command = QuickStatement_command(Q=line['cat'])
my_command.add_statement('P301', line['item'])
return my_command.content
@tool
def add_P31_categories (line, commands=None) -> str:
'''This tool add `instance of category` on all listed items.'''
my_command = QuickStatement_command(Q=line['item'])
my_command.add_statement('P31', 'Q4167836')
return my_command.content
@tool
def change_person_to_Q5(line, commands=None) -> str:
'''This tool change `is a list of:person` to `is a list of:human` on
all listed items.
'''
mc=QuickStatement_command(Q=line['sub'])
mc.del_statement('P360', 'Q215627')
mc.add_statement('P360', 'Q5')
return mc.content
@tool
def cat_and_list(line, commands=None) -> str:
'''This tool `transform category/subject` to
`list related to category/category related to list` on all listed items.
'''
mc1=QuickStatement_command(Q=line['category']) # cat
mc1.del_statement('P301', line['list'])
mc1.add_statement('P1753', line['list'])
mc2=QuickStatement_command(Q=line['list']) # list
mc2.del_statement('P910', line['item'])
mc2.add_statement('P1754', line['item'])
return ''.join([mc1.content, mc2.content])
def find_ambassador_linker(key, lang):
'''This function is used to make better chance keys of ambassadors list and
keys of categories link.
'''
key = key.lower()
if lang == 'en':
key = key.replace(' (country)', '')
key = key.replace('the ', '')
key = key.replace('united states of america', 'united states')
key = key.replace('people\'s republic of china', 'china')
key = key.replace('vatican city', 'vatican')
key = key.replace('holy see', 'vatican')
key = key.replace('apostolic nuncios', 'vatican')
elif lang == 'fr':
key = key.replace(' de ', ' ')
key = key.replace(" d\'", ' ')
key = key.replace(" l\'", ' ')
key = key.replace(' du ', ' ')
key = key.replace(' des ', ' ')
key = key.replace(' auprès ', ' to ')
key = key.replace(' à la ', ' to ')
key = key.replace(' la ', ' ')
key = key.replace(' au ', ' to ')
key = key.replace(' aux ', ' to ')
key = key.replace(' dans ', ' to ')
key = key.replace(' en ', ' to ')
key = key.replace(' à ', ' to ')
key = key.replace(' près le ', ' to ')
key = key.replace('saint-siège', 'vatican')
key = key.replace('cité du vatican', 'vatican')
key = key.replace('république populaire ', '')
if key.startswith(' '):
key = key[1:]
else:
print('Error: lang `{}` is not a correct code!'.format(lang))
raise Exception
return key
def find_ambassador_init_tool():
'''This is an initialisation for find_ambassador tool.'''
lang_list = ['en', 'fr']
help_csv = '''CSV with all instances of ambassadors+country needed
help: [[User:Metamorforme42/Ambassadors]] query
`display instance of ambassadors with countries`.'''
if len(lang_list) > 1:
lang = None
while lang not in lang_list:
print('Select language ', end='(')
print(*lang_list, sep='/', end='): ')
lang = input()
lang = lang.lower()
if lang not in lang_list:
print('Wrong lang code')
else:
lang = lang_list
default_file_name = 'ambassadors_list_{language}'
default_file_name = default_file_name.format(language=lang)
print(help_csv)
n = input_file_name(default=default_file_name, extension ='csv',
read_only=True)
with open (n, 'r') as file_in:
amblist = csv_dicts_copy(file_in)
amblist.clean_ids()
d = {}
for line in amblist:
key = '{} to {}'.format(line['p1Label'], line['p2Label'])
key = find_ambassador_linker(key=key, lang=lang)
d[key] = (line['item2'], line['item2Label'])
dic = {'amb_list':d, 'lang':lang}
return dic
@tool(init_tool=find_ambassador_init_tool)
def find_ambassador(line, commands=None) ->str:
'''This tool allow semi-auto link between categories and ambassadors items.'''
if commands is None:
print('Error: ambassadors list is needed!')
print('Error: language is needed!')
raise Exception
else:
l_keys = ['amb_list', 'lang']
l_keys_copy = l_keys.copy()
for i in commands.keys():
if i in l_keys:
l_keys_copy.remove(i)
else:
l_keys_copy.append(i)
if l_keys_copy != []:
print('Error: commands must contain wrong parameters')
print('parameters given: ', *commands.keys())
print('parameters needed: ', *l_keys)
raise Exception
d_cat_lang = {'en': 'Category:Ambassadors of ',
'fr': 'Catégorie:Ambassadeur'}
k = line['itemLabel'].replace(d_cat_lang[commands['lang']], '')
k = find_ambassador_linker(key=k, lang=commands['lang'])
try:
validation_values = ['yes', 'no']
print(line['itemLabel'], '\t', k, '\t', commands['amb_list'][k])
rep = None
while (rep not in validation_values):
print('correct?', end=' ')
print(*validation_values, sep='/', end=': ')
rep = input().lower()
if (rep not in validation_values):
print('Bad value ({}): try again.'.format(rep))
if rep == 'yes':
c = QuickStatement_command(Q=line['item'])
c.add_statement('P301', commands['amb_list'][k][0])
c2 = QuickStatement_command(Q=commands['amb_list'][k][0])
c2.add_statement('P910', line['item'])
ret = ''.join([c.content, c2.content])
else:
ret = ''
except KeyError:
print(line['itemLabel'], '\t', k, '\t', 'NOT FOUND')
ret = ''
return ret
def tool_selection():
'''Display a selection menu.
Not fully implemented, for the moment, it only displays a warning because
you need to manually configure the tool used in the code.
This menu will ask user to set cleanning_settings, and others settings.
'''
tools = ToolFunction().reg
print('WARNING: you need to uncomment one line to use the associated tool.')
print('Available functions:')
for f in tools:
print('\t{}()'.format(f.__name__))
if __name__ == '__main__':
# uncomment a tool in the followings
tool_selection()
#find_ambassador()
#cat_and_list()
#change_person_to_Q5()
#add_P31_categories()
#add_subject_to_categories()
#create_ambassadors_class(defaultmode='of', cleanning_settings={'clean':False})
#create_ambassadors_from_a_category(ambassador_class='Qtest')