Wikidata:Database reports/without claims by site/Configuration

The most recent version of this script can be found at Github: noclaims. The script is run on Toolforge in the deltabot tool account; Toolforge users usually have read-access to all scripts in that tool account.

A potentially outdated version is stored onwiki at User:DeltaBot/source/noclaims in order to be permanently available and conveniently accessible; it is being displayed on this page with code formatting. Mind that the onwiki source code might have been slightly altered in order to prevent onlyinclude directives from being effective, and that the wikitext parser im some situations messes up the formatted output of the transcluding page content including the source code.

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# licensed under CC-Zero: https://creativecommons.org/publicdomain/zero/1.0

from dataclasses import dataclass, field
from json.decoder import JSONDecodeError
from os.path import expanduser
from time import sleep, strftime
from typing import Generator

import mariadb
import pywikibot as pwb
import requests


# pywikibot config
SITE = pwb.Site('wikidata', 'wikidata')
REPO = SITE.data_repository()
EDIT_SUMMARY = 'Bot: Updating database report'

# wdqs config
WDQS_ENDPOINT = 'https://query.wikidata.org/bigdata/namespace/wdq/sparql'
WDQS_USERAGENT = f'{requests.utils.default_user_agent()} (noclaims.py via User:DeltaBot at Wikidata; mailto:tools.deltabot@toolforge.org)'
WD = 'http://www.wikidata.org/entity/'

# Wikidata replica config
DB_DEFAULT_FILE = f'{expanduser("~")}/replica.my.cnf'
WIKIDATA_REPLICA_HOST = 'wikidatawiki.analytics.db.svc.wikimedia.cloud'
WIKIDATA_REPLICA_DB = 'wikidatawiki_p'

# templates for output
HEADER = 'A list of items with a sitelink to {lang_code} but without any statements. Data as of <only' + 'include>{timestamp}</onlyinclude>.\n\n'
TABLE_ROW = '* [[{qid}]] - [[{interwiki_prefix}{page_title}]]\n'
TABLE_ROW_OVERVIEW = '{{dbname}}}\n'
HEADER_OVERVIEW = '{{timestamp}}\n'
FOOTER_OVERVIEW = '</table> __NOINDEX__'

# queries
QUERY_1 = """SELECT
    CONVERT(ips_site_id USING utf8) AS dbname,
    COUNT(*) AS cnt
FROM
    wb_items_per_site
GROUP BY
    ips_site_id
ORDER BY
    cnt DESC"""

QUERY_2 = """SELECT
    COUNT(*) AS cnt
FROM
    page_props,
    wb_items_per_site,
    page
WHERE
    pp_sortkey=0
    AND pp_propname='wb-claims'
    AND pp_page=page_id
    AND CONCAT("Q", ips_item_id)=page_title
    AND page_namespace=0
    AND ips_site_id=%(dbname)s"""

SPARQL_QUERY = """SELECT ?item ?lemma WHERE {{
  ?sitelink schema:about ?item;
            schema:isPartOf <{url}/>;
            schema:name ?lemma .
  ?item wikibase:statements "0"^^xsd:integer .
}} ORDER BY DESC(xsd:integer(SUBSTR(STR(?item), STRLEN("http://www.wikidata.org/entity/Q")+1))) LIMIT 1000"""


INTERWIKI_MAP = {
    'wikipedia' : '',
    'wikibooks' : 'b:',
    'wikiquote' : 'q:',
    'wiktionary' : 'wikt:',
    'wikinews' : 'n:',
    'wikisource' : 's:',
    'wikiversity' : 'v:',
    'wikivoyage' : 'voy:',
    'wikidata' : 'd:',
    'commons' : 'c:',
    'meta' : 'm:',
    'species' : 'species:',
}


@dataclass
class Project:
    dbname:str

    # from meta_p.wiki
    lang:str = field(init=False)
    family:str = field(init=False)
    url:str = field(init=False)

    # for petscan
    ps_params:tuple = field(init=False)

    # for interwikilinks
    interwiki_prefix:str = field(init=False)


    def __post_init__(self) -> None:
        if self.dbname in [ 'mediawikiwiki', 'sourceswiki', 'ruwikimedia', 'sewikimedia', 'incubatorwiki', 'wikimaniawiki', 'outreachwiki' ]:
            raise RuntimeWarning(f'Project {self.dbname} is being ignored')

        self._init_meta_params()
        self.ps_params = self._init_petscan_params()
        self.interwiki_prefix = self._init_interwiki_prefix()


    def _init_meta_params(self) -> None:
        query = """SELECT lang, family, url FROM wiki WHERE is_closed=0 AND has_wikidata=1 AND dbname=%(dbname)s LIMIT 1"""
        with Replica('meta.analytics.db.svc.wikimedia.cloud', 'meta_p') as (_, cur):
            cur.execute(query, {'dbname' : self.dbname })
            if cur.rowcount != 1:
                raise RuntimeError(f'Found {cur.rowcount} results for dbname {self.dbname}')
            result = cur.fetchall()

        self.lang = result[0].get('lang')  # TODO: what about nb -> no?
        self.family = result[0].get('family')
        self.url = result[0].get('url')


    def _init_petscan_params(self) -> tuple[str, str]:
        if self.family in [ 'wikipedia', 'wikibooks', 'wikiquote', 'wiktionary', 'wikinews', 'wikisource', 'wikiversity', 'wikivoyage' ]:
            return (self.lang, self.family)

        if self.dbname=='wikidatawiki':
            return ('wikidata', 'wikimedia')

        if self.dbname=='specieswiki':
            return ('species', 'wikimedia')

        if self.dbname=='commonswiki':
            return ('commons', 'wikimedia')

        if self.dbname=='metawiki':
            return ('meta', 'wikimedia')

        raise RuntimeError(f'No petscan parameters found for project {self.dbname}')


    def _init_interwiki_prefix(self) -> str:
        if self.family in [ 'wikipedia', 'wikibooks', 'wikiquote', 'wiktionary', 'wikinews', 'wikisource', 'wikiversity', 'wikivoyage' ]:
            return f':{INTERWIKI_MAP.get(self.family, "")}{self.lang}:'

        if self.dbname == 'wikidatawiki':
            return ''

        if self.dbname == 'specieswiki':
            return ':species:'

        if self.dbname == 'commonswiki':
            return ':c:'

        if self.dbname == 'metawiki':
            return ':m:'

        raise RuntimeError(f'No interwiki prefix found for project {self.dbname}')


class Replica:
    def __init__(self, host:str=WIKIDATA_REPLICA_HOST, dbname:str=WIKIDATA_REPLICA_DB) -> None:
        self.connection = mariadb.connect(
            host=host,
            database=dbname,
            default_file=DB_DEFAULT_FILE,
        )
        self.cursor = self.connection.cursor(dictionary=True)

    def __enter__(self):
        return (self.connection, self.cursor)

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.cursor.close()
        self.connection.close()


def query_wdqs(query:str, *, retry_credit:int=3) -> dict:
    if retry_credit <= 0:
        raise RuntimeError('No credit left for querying')

    response = requests.post(
        url=WDQS_ENDPOINT,
        data={
            'query' : query,
            'format' : 'json',
        },
        headers={
            'Accept' : 'application/sparql-results+json',
            'User-Agent': WDQS_USERAGENT,
        }
    )

    try:
        payload = response.json()
    except JSONDecodeError as exception:
        if response.elapsed.total_seconds() > 55 and retry_credit-1 > 0:
            sleep(60)
            return query_wdqs(query, retry_credit=retry_credit-1)

        raise RuntimeError('Cannot parse result from SPARQL endpoint') from exception

    return payload


def query_wdqs_generator(query:str) -> Generator[dict, None, None]:
    data = query_wdqs(query)

    for row in data.get('results', {}).get('bindings', []):
        yield row


def make_report(project:Project) -> str:
    text = ''

    for row in query_wdqs_generator(SPARQL_QUERY.format(url=project.url)):
        qid = row.get('item', {}).get('value', '').replace(WD, '')
        page_title = row.get('lemma', {}).get('value', '')
        text += TABLE_ROW.format(
            qid=qid,
            interwiki_prefix=project.interwiki_prefix,
            page_title=page_title
        )

    return text


def make_overview() -> tuple[str, int, int]:
    with Replica() as (_, cur):
        cur.execute(QUERY_1)
        results = cur.fetchall()

    idx = 0
    maxvalue = 0
    text = ''

    for row in results:
        dbname = row.get('dbname')
        cnt = row.get('cnt')
        if dbname is None or cnt is None:
            continue

        print(dbname, cnt)
        try:
            project = Project(dbname)
        except (RuntimeError, RuntimeWarning) as exception:
            print(exception)
            continue

        idx += 1
        with Replica() as (_, cur):
            cur.execute(QUERY_2, { 'dbname' : dbname })

            for cnt_without in cur:
                cnt = cnt_without.get('cnt')
                if cnt is None:
                    continue

                if cnt > maxvalue:
                    maxvalue = cnt

                text += TABLE_ROW_OVERVIEW.format(
                    dbname=dbname,
                    lang=project.lang,
                    family=project.family,
                    url=project.url,
                    iw=project.interwiki_prefix,
                    ps_lang=project.ps_params[0],
                    ps_family=project.ps_params[1],
                    cnt_without=cnt,
                    idx=idx,
                    cnt=cnt
                )

    return text, idx, maxvalue


def main():
    for dbname in [ 'dewiki', 'enwiki', 'eowiki', 'etwiki', 'frwiki', 'jawiki', 'nlwiki', 'ptwiki', 'ruwiki', 'svwiki', 'huwiki', 'simplewiki' ]:
        project = Project(dbname)

        report = make_report(project)
        text = f'{HEADER.format(lang_code=project.lang, timestamp=strftime("%Y-%m-%d %H:%M (%Z)"))}{report}'

        page = pwb.Page(SITE, f'Wikidata:Database reports/without claims by site/{dbname}')
        page.text = text
        page.save(summary=EDIT_SUMMARY, minor=False)


    report, idx, max_value = make_overview()
    stat = f'{max={max_value}}}\n'
    text = stat + HEADER_OVERVIEW.format(timestamp=strftime("%Y-%m-%d %H:%M (%Z)")) + report + FOOTER_OVERVIEW
    summary = f'Bot: Updating database report: reportlength: {idx}; max: {max_value}'

    page = pwb.Page(SITE, 'Wikidata:Database reports/without claims by site')
    page.text = text
    page.save(summary=summary, minor=False)


if __name__=='__main__':
    main()