#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
"""
Usage:
Many parameters to one claim; modified version of harvest_template
The script NEEDS to be changed manually for every task
Adding coordinates:
python pwb.py many_to_one -namespace:0 -family:wikipedia -lang:en -transcludes:'Infobox South African town' -template:'Infobox South African town' P625 latd latm lats S longd longm longs E
Adding qualifiers:
python pwb.py many_to_one -namespace:0 -family:wikipedia -lang:en -links:'Template:Popes' -template:'Infobox Christian leader' P582 term_end P39 type -qualifiers
The -overwrite argument allows to overwrite existing claims
¶ms;
"""
#
# (C) Multichill, Amir, 2013
# (C) Pywikibot team, 2013
#
# Distributed under the terms of MIT License.
#
__version__ = '$Id: 2507544f311b7164e04c7c83198a891f33e9f8ee $'
#
import re
import json
import pywikibot
from pywikibot import pagegenerators as pg
from datetime import datetime
from datetime import timedelta
docuReplacements = {'¶ms;': pywikibot.pagegenerators.parameterHelp}
class HarvestRobot:
"""
A bot to add Wikidata claims
"""
def __init__(self, generator, templateTitle, fields, overwrite=False, isQualifier=False):
"""
Arguments:
* generator - A generator that yields Page objects.
* templateTitle - The template to work on
* fields - A dictionary of fields that are of use to us
* overwrite - if existing claims should be overwritten
* isQualifier - if a qualifier is being added
"""
self.generator = generator
self.templateTitle = templateTitle.replace(u'_', u' ')
# TODO: Make it a list which also includes the redirects to the template
self.fields = fields
self.overwrite = overwrite
self.isQualifier = isQualifier
self.repo = pywikibot.Site().data_repository()
self.cacheSources()
def getSource(self, lang):
"""
Get the source for the specified language,
if possible
"""
if lang in self.source_values:
source = pywikibot.Claim(self.repo, 'p143')
source.setTarget(self.source_values.get(lang))
return source
def cacheSources(self):
"""
Fetches the sources from the onwiki list
and stores it internally
"""
page = pywikibot.Page(self.repo, u'Wikidata:List of wikis/python')
self.source_values = json.loads(page.get())
self.source_values = self.source_values['wikipedia']
for source_lang in self.source_values:
self.source_values[source_lang] = pywikibot.ItemPage(self.repo,
self.source_values[source_lang])
def run(self):
"""
Starts the robot.
"""
self.templateTitles = self.getTemplateSynonyms(self.templateTitle)
for i, page in enumerate(self.generator):
try:
self.procesPage(i, page)
except Exception as e:
pywikibot.exception(tb=True)
def newItem(self, page, item):
"""
Create item where none exists (from newitem.py by Multichill)
"""
self.pageAge = 21
self.pageAgeBefore = self.repo.getcurrenttime() - timedelta(days=self.pageAge)
self.lastEdit = 7
self.lastEditBefore = self.repo.getcurrenttime() - timedelta(days=self.lastEdit)
if page.isRedirectPage():
pywikibot.output('%s is a redirect page. Skipping.' % page)
elif page.namespace() == 2:
pywikibot.output('%s is a user page. Skipping.' % page)
elif page.editTime() > self.lastEditBefore:
pywikibot.output('Last edit on %s was on %s. Too recent. Skipping.' % (page, page.editTime().isoformat()))
else:
(revId, revTimestamp, revUser, revComment) = page.getVersionHistory(reverseOrder=True, total=1)[0]
if revTimestamp > self.pageAgeBefore:
pywikibot.output('Page creation of %s on %s is too recent. Skipping.' % (page, page.editTime().isoformat()))
elif page.langlinks():
# FIXME: Implement this
pywikibot.output('Found language links (interwiki links). Haven\'t implemented that yet so skipping.')
else:
# FIXME: i18n
summary = u'Bot: New item with sitelink from %s' % (page.title(asLink=True, insite=self.repo), )
data = {'sitelinks':
{item.getdbName(page.site):
{'site': item.getdbName(page.site),
'title': page.title()}
},
'labels':
{page.site.lang:
{'language': page.site.lang,
'value': page.title()}
}
}
pywikibot.output(summary)
item.editEntity(data, summary=summary)
def getTemplateSynonyms(self, title):
"""
Fetches redirects of the title, so we can check against them
"""
pywikibot.output('Finding redirects...') # Put some output here since it can take a while
temp = pywikibot.Page(pywikibot.Site(), title, ns=10)
if temp.isRedirectPage():
temp = temp.getRedirectTarget()
titles = [page.title(withNamespace=False)
for page
in temp.getReferences(redirectsOnly=True, namespaces=[10], follow_redirects=False)]
titles.append(temp.title(withNamespace=False))
return titles
def extractUrl(self, page, value, claim):
"""
Extract url datatype from field
"""
match = re.search("(?:\{\{|\[)\w*\|*(http[^|\s]+).*(?:\}\}|\])", value) # try format {{URL|htp://www.example.com}} or [http://www.example.com Example]
if match == None:
match = re.search("{{\w*\|(www\S+)}}", value) # try format {{URL|www.example.com}}
if match == None:
match = re.search("{{\w*\|(\S+)}}", value) # try format {{URL|example.com}}
extractedUrl = "http://www." + match.group(1)
else:
extractedUrl = "http://" + match.group(1)
else:
extractedUrl = match.group(1)
claim.setTarget(extractedUrl)
pywikibot.output('Extracting %s --> %s' % (value, extractedUrl))
def monthToNumber(self, page, month):
"""
Returns an integer from the month name
"""
languageEdition = page.site.language()
if languageEdition == "af": # Afrikaans
month_dictio = {"Januarie": "1", "Februarie": "2", "Maart": "3", "April": "4",
"Mei": "5", "Junie": "6", "Julie": "7", "Augustus": "8",
"September": "9", "Oktober": "10", "November": "11", "Desember": "12"}
elif languageEdition == "cs": # Česky (Czech)
month_dictio = {"leden": "1", "únor": "2", "březen": "3", "duben": "4",
"květen": "5", "červen": "6", "červenec": "7", "srpen": "8",
"září": "9", "říjen": "10", "listopad": "11", "prosinec": "12",
"ledna": "1", "února": "2", "března": "3", "dubna": "4",
"května": "5", "června": "6", "července": "7", "srpna": "8",
"října": "10", "listopadu": "11", "prosince": "12"}
elif languageEdition == "de": # Deutsch
month_dictio = {"Januar": "1", "Februar": "2", "März": "3", "April": "4",
"Mai": "5", "Juni": "6", "Juli": "7", "August": "8",
"September": "9", "Oktober": "10", "November": "11", "Dezember": "12"}
elif languageEdition == "fr": # Français
month_dictio = {"janvier": "1", "février": "2", "mars": "3", "avril": "4",
"mai": "5", "juin": "6", "juillet": "7", "août": "8",
"septembre": "9", "octobre": "10", "novembre": "11", "décembre": "12"}
else: # English (by default)
month_dictio = {"January": "1", "February": "2", "March": "3", "April": "4",
"May": "5", "June": "6", "July": "7", "August": "8",
"September": "9", "October": "10", "November": "11", "December": "12"}
if month in month_dictio.keys():
for monthName, monthNumber in month_dictio.items():
month = month.replace(monthName, monthNumber)
month = int(month)
else:
try:
month = int(month)
except ValueError:
print month + " is not a valid month"
month = None
return month
def extractTime(self, page, value, claim):
"""
Extract time from field
"""
extractedMonth = None
extractedDay = None
regexDict = {"\{\{[dD]ate.*?\|(\d{1,2})\|(\w*)\|(\d{1,4})" : "ddMMyyyy", # French templates, format 1 January 2000
"(\d{1,2})\.?\s(\w*)\]{0,2}\s\[{0,2}(\d{1,4})" : "ddMMyyyy", # format 1 January 2000
"(\w*)\.?\s(\d{1,2}),\s(\d{1,4})" : "MMddyyyy", # format January 1, 2000
"\{\{\w(?:irth|eath|tart|ilm).*?(\d{1,4})\|?(\d{0,2})\|?(\d{0,2})" : "yyyymmdd", # English templates, format 2000 01 01
"([\d,]+)(?:\s*| )(?:BC|bc|av. J)" : "yyyy BC"} # years BC, format 1000 BCE
for regex, timeFormat in regexDict.items():
match = re.search(regex, value, re.UNICODE)
if match != None:
if timeFormat == "ddMMyyyy": # day, month name, year
extractedMonth = self.monthToNumber(page, match.group(2))
if extractedMonth == None:
match = None
continue
else:
extractedYear = int(match.group(3))
extractedDay = int(match.group(1))
break
elif timeFormat == "MMddyyyy": # month name, day, year
extractedMonth = self.monthToNumber(page, match.group(1))
if extractedMonth == None:
match = None
continue
else:
extractedYear = int(match.group(3))
extractedDay = int(match.group(2))
break
elif timeFormat == "yyyymmdd": # year, month, day
extractedMonth = int(match.group(2))
extractedYear = int(match.group(1))
extractedDay = int(match.group(3))
break
elif timeFormat == "yyyy BC": # year BCE
extractedYear = "-" + match.group(1).replace(",", "") # remove commas, if any
if match == None:
match = re.search("^\[{0,2}(-?\d{3,4})(?!\d*(st|nd|rd|th))", value) # last resort
extractedYear = match.group(1)
timeclaim = pywikibot.WbTime(year=extractedYear, month=extractedMonth, day=extractedDay)
claim.setTarget(timeclaim)
pywikibot.output('Extracting %s --> %s-%s-%s' % (value, extractedYear, extractedMonth, extractedDay))
def addAsQualifier(self, qualifier, item, page, baseClaim_pID, baseClaim_field):
"""
Not implemented; only works if baseClaimDictio is specified manually
and addAsQualifier is called somewhere within procesPage
"""
baseClaimDictio = {"pID" : baseClaim_pID, "field": baseClaim_field}
wikipage = pywikibot.Page(page.site, baseClaimDictio["field"])
if wikipage.isRedirectPage():
wikipage = wikipage.getRedirectTarget()
itemID = pywikibot.ItemPage.fromPage(wikipage)
baseClaim = pywikibot.Claim(self.repo, baseClaimDictio["pID"])
baseClaim.setTarget(itemID)
if baseClaim.getID() in item.get().get('claims'):
if self.overwrite == True:
# overwrite
baseClaimToChange = item.claims[baseClaim.getID()]
baseClaimIsSame = False
for existingClaim in baseClaimToChange:
if baseClaim.getTarget() == existingClaim.getTarget():
baseClaimIsSame = True
break
else:
valueToChange = existingClaim.getTarget()
valueNew = baseClaim.getTarget()
if baseClaimIsSame == False:
pywikibot.output('Changing %s --> %s' % (valueToChange, valueNew))
existingClaim.changeTarget(valueNew)
baseClaim = existingClaim
else:
# add qualifier to existing claim
item.get()
baseClaim = item.claims[baseClaim.getID()][0]
else:
try:
item.addClaim(baseClaim)
except pywikibot.exceptions.NoPage:
pywikibot.output('%s doesn\'t exist so I can\'t link to it' % (baseClaimDictio["field"]))
return
if qualifier.getID() in baseClaim.qualifiers.keys():
if self.overwrite == True:
# overwrite
qualifierToChange = baseClaim.qualifiers[qualifier.getID()][0]
valueToChange = qualifierToChange.getTarget()
valueNew = qualifier.getTarget()
try:
pywikibot.output('Changing %s --> %s' % (valueToChange, valueNew))
self.repo.editQualifier(baseClaim, qualifier)
except pywikibot.data.api.APIError:
pywikibot.output('Old value same as new value')
else:
print ("%s already exists for %s" % (qualifier.getID(), baseClaim.getTarget()))
else:
print ("Adding %s as qualifier for %s" % (qualifier.getID(), baseClaim.getTarget()))
baseClaim.addQualifier(qualifier)
source = self.getSource(page.site.language())
if source:
for existingSources in baseClaim.sources:
if existingSources[u'P143'][0] == source:
return # the source was already set
baseClaim.addSource(source, bot=True)
def procesPage(self, index, page):
"""
Proces a single page
"""
item = pywikibot.ItemPage.fromPage(page)
pywikibot.output(u'Processing No. %s: %s' % (index, page))
if not item.exists():
# create the page
self.newItem(page, item)
item = pywikibot.ItemPage.fromPage(page)
if not item.exists():
# The item was not created
return
claim = pywikibot.Claim(self.repo, self.fields[0])
# Check if the property isn't already set
if self.overwrite == False:
if claim.getID() in item.get().get('claims').keys():
pywikibot.output(
u'%s already exists (-overwrite to change it)'
% claim.getID())
return
# TODO FIXME: This is a very crude way of dupe
# checking
if str(item.getVersionHistory(total=50)).find("wbremoveclaims-remove:1| */ [[Property:" + claim.getID()) is not -1:
pywikibot.output('%s cannot be added as it was recently removed from the item' % (claim.getID(),))
return
pagetext = page.get()
templates = pywikibot.extract_templates_and_params(pagetext)
for (template, fielddict) in templates:
valuesDictio = {}
# Clean up template
template = pywikibot.Page(page.site, template,
ns=10).title(withNamespace=False)
# We found the template we were looking for
if template in self.templateTitles:
for field, value in fielddict.items():
field = field.strip()
value = value.strip()
if value != "": # must not be an empty string
if field == self.fields[1]:
valuesDictio["One"] = value
continue
elif len(self.fields) > 2 and field == self.fields[2]:
valuesDictio["Two"] = value
continue
elif len(self.fields) > 3 and field == self.fields[3]:
valuesDictio["Three"] = value
continue
elif len(self.fields) > 4 and field == self.fields[4]:
valuesDictio["Four"] = value
continue
elif len(self.fields) > 5 and field == self.fields[5]:
valuesDictio["Five"] = value
continue
elif len(self.fields) > 6 and field == self.fields[6]:
valuesDictio["Six"] = value
continue
elif len(self.fields) > 7 and field == self.fields[7]:
valuesDictio["Seven"] = value
continue
elif len(self.fields) > 8 and field == self.fields[8]:
valuesDictio["Eight"] = value
try:
match = re.search("([^\{]*)", valuesDictio["One"])
value = "[[" + match.group(1) + "]]"
except KeyError:
print("%s not found" % self.fields[1])
return
if claim.getType() == 'wikibase-item':
# Try to extract a valid page
match = re.search(pywikibot.link_regex, value)
if match == None:
pywikibot.output('No valid item found for %s' % (claim.getID(),))
continue
else:
try:
link = pywikibot.Link(match.group(1))
linkedPage = pywikibot.Page(link)
if linkedPage.isRedirectPage():
linkedPage = linkedPage.getRedirectTarget()
linkedItem = pywikibot.ItemPage.fromPage(linkedPage)
claim.setTarget(linkedItem)
if not linkedItem.title():
print " " # this is only to raise NoPage
# avoid adding disambiguation pages
if linkedPage.isDisambig():
pywikibot.output('%s is a disambiguation page. Skipping.' % (linkedPage,))
continue
except pywikibot.exceptions.NoPage:
pywikibot.output('%s doesn\'t exist so I can\'t link to it' % (linkedPage,))
continue
elif claim.getType() == 'string':
if value == "":
pywikibot.output('No valid string found for %s' % (claim.getID(),))
continue
else:
match = re.search("^(?!(?:\{|\[http))\W*([^<>]+)(?<![<>])", value, re.UNICODE) # avoid tags, linka and templates
if match:
value = match.group(1)
claim.setTarget(value.strip())
else:
pywikibot.output('%s includes forbidden characters' % (value,))
continue
elif claim.getType() == 'url':
try:
self.extractUrl(page, value, claim)
except AttributeError:
pywikibot.output('No valid URL found for %s' % (claim.getID(),))
continue
elif claim.getType() == 'time':
try:
self.extractTime(page, value, claim)
except AttributeError:
pywikibot.output('No valid time for %s' % (claim.getID(),))
continue
elif claim.getType() == 'commonsMedia':
mediasite = pywikibot.Site("commons", "commons")
if value == "":
pywikibot.output('No valid media file found for %s' % (claim.getID(),))
continue
else:
match = re.search("\w{4,5}:?([^|]*)\|", value) # extract filename
if match != None:
value = "File:" + match.group(1)
# check if the image exists on Commons
image = pywikibot.ImagePage(mediasite, value)
if image.exists() == False:
value = "File:" + value # try the same query with the namespace set
image = pywikibot.ImagePage(mediasite, value)
if image.exists() == False:
pywikibot.output('%s does not exist on Commons' % (value,))
continue
image = pywikibot.ImagePage(mediasite, value)
claim.setTarget(image)
elif claim.getType() == 'globecoordinate':
if ("One" not in valuesDictio.keys()) or ("Five" not in valuesDictio.keys()): # at least lat/long deg. must exist
print "No coordinates found"
continue
if ("Four" not in valuesDictio.keys()) or ("Eight" not in valuesDictio.keys()): # at least lat/long deg. must exist
print "Hemisphere flag missing"
latNS = 1 # MUST be changed every time
latEW = 1
# continue
else:
if valuesDictio["Four"] == "S":
latNS = -1
elif valuesDictio["Four"] == "N":
latNS = 1
if valuesDictio["Eight"] == "W":
latEW = -1
elif valuesDictio["Eight"] == "E":
latEW = 1
if ("Two" not in valuesDictio.keys()) or ("Six" not in valuesDictio.keys()): # lat/long min
valuesDictio["Two"] = valuesDictio["Six"] = 0
if ("Three" not in valuesDictio.keys()) or ("Seven" not in valuesDictio.keys()): # lat/long sec
valuesDictio["Three"] = valuesDictio["Seven"] = 0
latitude = latNS * (float(valuesDictio["One"]) + (float(valuesDictio["Two"]) * float(1)/60) + (float(valuesDictio["Three"]) * float(1)/60 * float(1)/60))
longitude = latEW * (float(valuesDictio["Five"]) + (float(valuesDictio["Six"]) * float(1)/60) + (float(valuesDictio["Seven"]) * float(1)/60 * float(1)/60))
coordinate = pywikibot.Coordinate(lat=latitude, lon=longitude, precision=0.001)
claim.setTarget(coordinate)
else:
pywikibot.output("%s is not a supported datatype." % claim.getType())
continue
if self.isQualifier == True:
try:
baseClaim_pID = self.fields[2]
baseClaim_field = valuesDictio["Three"]
match = re.search(pywikibot.link_regex, baseClaim_field)
if match != None:
baseClaim_field = match.group(1)
else:
pywikibot.output('No valid item found for %s' % (self.fields[3]))
return
self.addAsQualifier(claim, item, page, baseClaim_pID, baseClaim_field)
except KeyError:
print("%s not found" % self.fields[3])
return
else:
if claim.getID() in item.get().get('claims'):
# overwrite
item.get()
claimToChange = item.claims[claim.getID()][0]
valueToChange = claimToChange.getTarget()
valueNew = claim.getTarget()
# if the new and old claims are the same
if valueToChange != valueNew:
pywikibot.output('Changing %s --> %s' % (valueToChange, valueNew))
claimToChange.changeTarget(valueNew)
else:
pywikibot.output('Old value %s same as new value %s' % (valueToChange, valueNew))
else:
if item == claim.getTarget():
print "The target and the item are identical, skipping"
else:
pywikibot.output('Adding %s --> %s' % (claim.getID(), claim.getTarget()))
item.addClaim(claim)
# A generator might yield pages from multiple sites
source = self.getSource(page.site.language())
if source:
claim.addSource(source, bot=True)
def main():
gen = pg.GeneratorFactory()
commandline_arguments = list()
templateTitle = u''
overwrite = False
isQualifier = False
for arg in pywikibot.handleArgs():
if arg.startswith('-template'):
if len(arg) == 9:
templateTitle = pywikibot.input(
u'Please enter the template to work on:')
else:
templateTitle = arg[10:]
elif arg.startswith('-overwrite'):
overwrite = True
elif arg.startswith('-qualifiers'):
isQualifier = True
elif gen.handleArg(arg):
continue
else:
commandline_arguments.append(arg)
if not templateTitle:
raise ValueError # or something.
fields = commandline_arguments
generator = gen.getCombinedGenerator()
if not generator:
transclusionPage = pywikibot.Page(
pywikibot.Link(
templateTitle, defaultNamespace=10, source=pywikibot.Site()
)
)
generator = pywikibot.Site().page_embeddedin(
transclusionPage, filterRedirects=None,
namespaces=0, step=None, total=None, content=False
)
bot = HarvestRobot(generator, templateTitle, fields, overwrite, isQualifier)
bot.run()
if __name__ == "__main__":
main()