User:Deryck Chan/Property migration tool

Run this code in a python file (or Jupyter Python notebook) in the same directory. You'll be prompted to confirm each edit with 'y', 'n', or 'break' (exit program).

#input vars
main_property = 'P1685'
qualifier_property = 'P642'
new_qualifier_property = 'P972'
allowed_qualifier_targets = {} #leave empty for everything to be processed; change to format {'Q1234', 'Q5678'} if you want to limit the targets to be operated on

limit = 3

logFileName = 'migrator1log.txt'

edit_summary = qualifier_property + ' is no longer used with ' + main_property + '; migrating' + main_property + '/' + qualifier_property + ' to ' + '/' + new_qualifier_property
no_promptbox = 0 #set to 1 to operate on entire query automatically, 0 to prompt once per item

# query_filename = 'qualifier_migrate.rq'
QUERY = """SELECT DISTINCT ?item ?itemLabel ?property ?propertyLabel ?value ?asObject ?asObjectLabel
WHERE
{
  wd:""" + main_property + """ wikibase:claim ?p .
  ?prop pq:""" + qualifier_property + """ ?asObject .
  hint:Query hint:optimizer "None" .	
  ?item ?p ?prop . 
  ?property wikibase:claim ?p .  
  ?property wikibase:statementProperty ?ps .
  ?prop ?ps ?value .
  SERVICE wikibase:label { bd:serviceParam wikibase:language "en,bg"  }    
}
ORDER BY ASC(?value) 
LIMIT """ + str(limit)

#start of actual script
import pywikibot
from pywikibot import pagegenerators as pg

import datetime

site = pywikibot.Site("wikidata", "wikidata")
wikidata_site = site #compatibility stuff
repo = site.data_repository()    

def getLabelFromObject(WDObject):
    item_dict = WDObject.get()
    item_label = False
    if 'labels' in item_dict:
        if 'en' in item_dict['labels']:
            item_label = item_dict['labels']['en']
    label = item_label
    if (label):
        return label
    else:
        return WDObject.getID()

def getLabelFromWDID(ID): #works for properties only. need separate function for items. why why why
    site = pywikibot.Site("wikidata", "wikidata")
    repo = site.data_repository()
    item = pywikibot.PropertyPage(repo, ID)
    return getLabelFromObject(item)

main_property_label = getLabelFromWDID(main_property)
qualifier_property_label = getLabelFromWDID(qualifier_property)
new_qualifier_property_label = getLabelFromWDID(new_qualifier_property)

#replaced by including SPARQL inside this program
#with open(query_filename, 'r') as query_file:
#    QUERY = query_file.read()

generator = pg.WikidataSPARQLPageGenerator(QUERY, site=wikidata_site)
generator = site.preloadpages(generator, pageprops=True)

break_flag = 0
edit_count = 0

f = open(logFileName, 'a')
print(datetime.datetime.now(), file=f)

for item in generator:
    #operate on the most recent one for testing
    item_dict = item.get()
    item_label = getLabelFromObject(item)
            
    print('Now working on ', item.getID(), ' ', item_label)#, 'Ready? ("yes" to go, "break" to stop)')
    print(item.getID(), file=f)

    for claim_object in item_dict['claims'][main_property]:
        claim_target = claim_object.getTarget()
        claim_target_label = getLabelFromObject(claim_target) if type(claim_target) ==pywikibot.page.ItemPage else '[none]'
        
        if qualifier_property in claim_object.qualifiers:
            for qualifier_object in claim_object.qualifiers[qualifier_property]:
                qualifier_target = qualifier_object.getTarget()
                if (len(allowed_qualifier_targets) > 0 and 
                    qualifier_target.getID() not in allowed_qualifier_targets):
                    continue
                    
                qualifier_target_label = getLabelFromObject(qualifier_target) if type(qualifier_target) == pywikibot.page.ItemPage else '[none]'
                    
                print(item_label, main_property_label, claim_target_label, 
                      qualifier_property_label, qualifier_target_label, 
                      'change to', new_qualifier_property_label, '!')

                acceptable_prompt_set = {'y', 'yes', 'n', 'no', 'break'}
                promptbox = ''
                if (no_promptbox == 1):
                    promptbox = 'yes'
                while (promptbox not in acceptable_prompt_set):
                    promptbox = input()  #becomes automatic if set to 'y'

                if promptbox == 'y' or promptbox == 'yes':
                    qualifier_dict = qualifier_object.toJSON()
                    qualifier_dict['property'] =  new_qualifier_property #set up the qualifier change
                    print('Changing to', new_qualifier_property_label)
                    new_qualifier_object = qualifier_object.qualifierFromJSON(site = wikidata_site, data = qualifier_dict)
                    claim_object.addQualifier(new_qualifier_object, summary=edit_summary)
                    edit_count = edit_count + 1

                elif promptbox == 'n' or promptbox == 'no':
                    print('Skipped')
                    
                elif promptbox == 'break':
                    break_flag = 1
                    break

        if break_flag == 1:
            break
    if break_flag == 1:
        break

print('All done, thanks for using! We edited', edit_count, 'qualifiers.')
f.close()

Migration status check edit

To get statistics for the use of a particular property as qualifier, use this script. The script will generate some output as it progresses, which is useful because it takes several minutes to run due to the computational intensity of SPARQL. The Wikitext table will be saved in a file called "status_table.txt" in the same directory as the script.

import pywikibot
from pywikibot import pagegenerators as pg

site = pywikibot.Site("wikidata", "wikidata")
wikidata_site = site #compatibility stuff
repo = site.data_repository()    

def SPARQLPageCounter(query, site=None,
                                item_name='item', endpoint=None,
                                entity_url=None, result_type=set):
    #forcefully repurpose from WikidataSPARQLPageGenerator
    from pywikibot.data import sparql

    if site is None:
        site = pywikibot.Site()
    repo = site.data_repository()
    dependencies = {'endpoint': endpoint, 'entity_url': entity_url}
    if not endpoint or not entity_url:
        dependencies['repo'] = repo
    query_object = sparql.SparqlQuery(**dependencies)
    data = query_object.get_items(query,
                                  item_name=item_name,
                                  result_type=result_type)
    #items_pages = (pywikibot.ItemPage(repo, item) for item in data)
    #if isinstance(site, pywikibot.site.DataSite):
    #    return items_pages
    
    return len(data)

#pip install sparqlwrapper
#https://rdflib.github.io/sparqlwrapper/

import SPARQLWrapper
from SPARQLWrapper import SPARQLWrapper, JSON
sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
sparql.setQuery("""SELECT DISTINCT ?property ?propertyLabel
WHERE
{
  ?prop pq:P794 ?asObject .
  hint:Query hint:optimizer "None" .	
  ?item ?p ?prop . 
  ?property wikibase:claim ?p .  
  ?property wikibase:statementProperty ?ps .
  ?prop ?ps ?value .
  SERVICE wikibase:label { bd:serviceParam wikibase:language "en,bg"  }    
}
ORDER BY ASC(?property) 
LIMIT 1000""")
sparql.setReturnFormat(JSON)
results = sparql.query().convert()

print(len(results['results']['bindings']), 'properties found.')

migration_table = dict()

for result in results['results']['bindings']:

    property_uri = result['property']['value']
    property_label = result['propertyLabel']['value']

    base_uri, _, property_id = property_uri.rpartition('/') #repurposed from ItemPage.from_entity_uri

    property_query = """SELECT DISTINCT ?item
    WHERE
    {
      ?prop pq:P794 ?asObject .
      hint:Query hint:optimizer "None" .
      ?item ?p ?prop . 
      ?property wikibase:claim ?p .  
      ?property wikibase:statementProperty ?ps .
      ?prop ?ps ?value .
      wd:""" + property_id + """ wikibase:claim ?p .
      SERVICE wikibase:label { bd:serviceParam wikibase:language "en,yue,zh"  }    
    }
    ORDER BY ASC(?item) 
    LIMIT 10000"""

    num_of_items = SPARQLPageCounter(property_query, site=site)
    
    property_query = """SELECT DISTINCT ?item
    WHERE
    {
      ?prop pq:P794 ?item .
      hint:Query hint:optimizer "None" .
      ?mainitem ?p ?prop . 
      ?property wikibase:claim ?p .  
      ?property wikibase:statementProperty ?ps .
      ?prop ?ps ?value .
      wd:""" + property_id + """ wikibase:claim ?p .
      SERVICE wikibase:label { bd:serviceParam wikibase:language "en,yue,zh"  }    
    }
    ORDER BY ASC(?item) 
    LIMIT 1000"""

    num_of_targets = SPARQLPageCounter(property_query, site=site)
    
    print(property_id,  property_label, ':', num_of_items, 'items and', num_of_targets, 'targets')
    
    migration_table[property_id] = {'property_id': property_id, 
                                    'label': property_label, 
                                    'items': num_of_items,
                                    'targets': num_of_targets}
    
    
f = open('status_table.txt', 'w')
for k in migration_table:
    print('|-', file=f)
    print('| {{P|' + k + '}} ||', migration_table[k]['items'],
          '||', migration_table[k]['targets'], file=f)
f.close()