User:FLOSSbot/repository-protocol

#!/usr/bin/env python3
import pywikibot
from pywikibot import pagegenerators as pg
import os
import re
import requests
import time

QUERY = """
SELECT DISTINCT ?item WHERE {
  ?item p:P1324 ?repo.   # for all source code repository statements
  ?repo ps:P1324 ?value. # that are not null
  OPTIONAL { ?repo pq:P2700 ?protocol } # try to get the protocol
  FILTER(!BOUND(?protocol)) # and only keep those with no protocol
} ORDER BY ?item
# """ + str(time.time())

wikidata_site = pywikibot.Site("wikidata", "wikidata")
Q_git = pywikibot.ItemPage(wikidata_site, "Q186055", 0)
Q_svn = pywikibot.ItemPage(wikidata_site, "Q46794", 0)
Q_hg = pywikibot.ItemPage(wikidata_site, "Q476543", 0)
Q_fossil = pywikibot.ItemPage(wikidata_site, "Q1439431", 0)
Q_bzr = pywikibot.ItemPage(wikidata_site, "Q812656", 0)
Q_cvs = pywikibot.ItemPage(wikidata_site, "Q467252", 0)
Q_http = pywikibot.ItemPage(wikidata_site, "Q8777", 0)
P_protocol = "P2700"
P_source_code_repository = "P1324"


def guess_protocol_from_url(url):
    if 'github.com' in url:
        return Q_git
    if 'code.launchpad.net' in url:
        return Q_bzr
    if (url.lower().startswith('http://bxr.su/') or
        url.lower().startswith('http://openbsd.su/')):
        return Q_http
    if (re.match('https?://sourceforge.net/p/.*/(svn|code|code-0)/HEAD/tree/', url) or
        re.match('https?://sourceforge.net/p/.*/(git|code|code-git)/ci/(default|master)/tree/', url) or
        re.match('https?://.*.codeplex.com/SourceControl', url) or
        re.match('http://svn.savannah.gnu.org/viewvc/\?root=', url) or
        re.match('https://svn.tuxfamily.org/viewvc.cgi/', url)):
        return Q_http
    if url.startswith('git://'):
        return Q_git
    if url.startswith('svn://'):
        return Q_svn
    return None


def verify_git(url):
    cmd = "timeout 30 git ls-remote " + url + " HEAD"
    print("git verification: " + cmd)
    return os.system(cmd) == 0


def verify_hg(url):
    cmd = """
set -e
hg identify {url}
""".format(url=url)
    print("hg verification: " + cmd)
    return os.system(cmd) == 0


def verify_svn(url):
    cmd = """
set -e
rm -fr /tmp/tmpclone
mkdir /tmp/tmpclone
cd /tmp/tmpclone
timeout 30 svn co {url} | grep -q -m 1 -e 'A    '
""".format(url=url)
    print("svn verification: " + cmd)
    return os.system(cmd) == 0


def verify_fossil(url):
    cmd = """
set -e
rm -fr /tmp/tmpclone
mkdir /tmp/tmpclone
cd /tmp/tmpclone
timeout 30 fossil clone {url} /tmp/tmpclone | grep -q -m 1 -e 'Round-trips'
""".format(url=url)
    print("fossil verification: " + cmd)
    return os.system(cmd) == 0


def verify_bzr(url):
    cmd = """
set -e
bzr version-info {url}
""".format(url=url)
    print("bzr verification: " + cmd)
    return os.system(cmd) == 0

def verify_http(url):
    return requests.get(url).status_code == requests.codes.ok

def verify_protocol(url, protocol):
    if protocol == Q_git:
        return verify_git(url)
    elif protocol == Q_hg:
        return verify_hg(url)
    elif protocol == Q_fossil:
        return verify_fossil(url)
    elif protocol == Q_bzr:
        return verify_bzr(url)
    elif protocol == Q_svn:
        return verify_svn(url)
    elif protocol == Q_http:
        return verify_http(url)
    return None


def try_protocol(url):
    print("trying all known protocols on " + url)
    if verify_git(url):
        return Q_git
    elif verify_hg(url):
        return Q_hg
    elif verify_svn(url):
        return Q_svn
    elif verify_bzr(url):
        return Q_bzr
    elif verify_fossil(url):
        return Q_fossil
    return None


def guess_protocol(repository):
    url = repository.getTarget()
    protocol = guess_protocol_from_url(url)
    if protocol:
        if not verify_protocol(url, protocol):
            print("ERROR " + url + " does not obey the expected protocol")
            return None
        else:
            return protocol
    return try_protocol(url)


def fixup_url(repository):
    url = repository.getTarget()
    new_url = None

    if url.startswith('https://git-wip-us.apache.org/repos/asf?p='):
        new_url = url.replace('?p=', '/')

    m = re.match('http://(?:bazaar|code).launchpad.net/~[^/]+/([^/]+)', url)
    if m:
        new_url = "https://code.launchpad.net/" + m.group(1)

    if new_url:
        print("REPLACE " + url + " with " + new_url)
        repository.changeTarget(new_url)
        return True
    else:
        return False
    

def extract_repository(url):
    m = re.match('https://(.*).codeplex.com/SourceControl/latest', url)
    if m:
        return "https://git01.codeplex.com/" + m.group(1)
    m = re.match('http://svn.savannah.gnu.org/viewvc/\?root=(.*)', url)
    if m:
        return "svn://svn.sv.gnu.org/" + m.group(1)
    m = re.match('https://svn.tuxfamily.org/viewvc.cgi/(\w+)_(\w+)/', url)
    if m:
        return "svn://svn.tuxfamily.org/svnroot/" + m.group(1) + "/" + m.group(2)
    if re.match('https?://sourceforge.net/p/.*/(git|code|code-git)/ci/(default|master)/tree/', url):
        r = requests.get(url)
        if r.status_code != requests.codes.ok:
            return None
        u = re.findall('git clone (git://git.code.sf.net/p/.*/(?:git|code|code-git))', r.text)
        if len(u) == 1:
            return u[0]
        u = re.findall('hg clone (http://hg.code.sf.net/p/.*/code)', r.text)
        if len(u) >= 1:
            return u[0]
    if re.match('https?://sourceforge.net/p/.*/(svn|code|code-0)/HEAD/tree/', url):
        r = requests.get(url)
        if r.status_code != requests.codes.ok:
            return None
        u = re.findall('svn checkout (svn://svn.code.sf.net.*/trunk)', r.text)
        if len(u) == 1:
            return u[0]
    return None

for item in pg.WikidataSPARQLPageGenerator(QUERY, site=wikidata_site, result_type=list):
    print("WORKING ON https://www.wikidata.org/wiki/"  + item.id)
    item_dict = item.get()
    clm_dict = item_dict["claims"]

    urls = []
    for claim in clm_dict['P1324']:
        urls.append(claim.getTarget())

    for url in urls:
        extracted = extract_repository(url)
        if extracted and extracted not in urls:
            print("ADDING " + extracted + " as a source repository discovered in " + url)
            source_code_repository = pywikibot.Claim(wikidata_site, P_source_code_repository, 0)
            source_code_repository.setTarget(extracted)
            item.addClaim(source_code_repository)
        
    for claim in clm_dict['P1324']:
        fixup_url(claim)

    for claim in clm_dict['P1324']:
        if P_protocol in claim.qualifiers:
            print("IGNORE " + claim.getTarget() + " because it already has a protocol")
            continue
        target_protocol = guess_protocol(claim)
        if not target_protocol:
            print("SKIP " + claim.getTarget())
            os.system("firefox https://www.wikidata.org/wiki/" + item.id)
            exit(0)
        protocol = pywikibot.Claim(wikidata_site, P_protocol, 0)
        protocol.setTarget(target_protocol)
        claim.addQualifier(protocol, bot=True)
        print("waiting")
        time.sleep(2)