多进程加多线程下载github上2009-2016年所有java项目

前言

最近需要从github上下载从2009年到2016年的所有java项目,于是研究了一下github提供的相关api并使用python进行了实现。
github api地址:https://developer.github.com/v3/

获取github api的权限认证token

github api对访问次数有限制,对未经授权的访问每小时最多访问60次,对经过授权的认证每小时最多访问5000次。因此我们首先需要获取授权的token。

登陆github点击右上角头像,然后点击settings:


点击左下角的Developer Settings:

点击左下方的Personal access tokens再点击右上方的Generate new token,填写相关信息后就可以获得token:

将该token复制下来并牢记在本地,因为之后你将无法再看到这个token,除非重新生成。

获取2009年到2016年所有java项目的下载地址

我们通过github提供的search api来获取2009年到2016年所有java项目的下载地址,相关api的说明文档为:https://developer.github.com/v3/search/#search-repositories
github对这个查询接口有几个限制:

  1. 这个接口每次请求最多返回一百个搜索结果,如果需要得到更多则需要通过page参数控制。
  2. 这个接口对同一个搜索条件最多能看到1000个搜索结果,因此我们需要将搜索分片,每次只搜索某一天的项目。但这样结果总数仍会大于1000条,因此我们过滤掉star数为0的项目,因为这绝大部分是玩具项目或者测试项目,没有太大的参考价值。

下面为查询某一天的所有star数大于0的java项目的代码,注意函数里有对api访问次数用尽的处理。

import requests
import json
import time

headers = {
        "Authorization": "token 46a8dd2ec922c9a006368c6756d46c9bba40c6d0"
    }

def get_download_url_of_oneday(datestr):
    """
    获取某一天的所有下载链接
    :param datestr:查询的日期,形如2018-10-24
    :return:
    """
    url = 'https://api.github.com/search/repositories?q=language:java stars:>=1 created:%s..%s&per_page=100' % (
    datestr,  datestr)
    while True:
        r = requests.get(url, headers=headers)
        # 访问api的次数用尽则休眠10分钟
        if r.status_code == 403:
            time.sleep(60 * 10)
        else:
            res = r.json()
            total_count = res['total_count']
            print('Total count: ', total_count)
            with open('Jsons/1__%s.json' %datestr, 'w', encoding='utf-8') as fw:
                json.dump(res, fw, indent=4)
            if total_count > 100:
                top = math.ceil(total_count / 100) + 1
                for i in range(2, top if top <= 11 else 11):
                    page_url = '%s&page=%d'%(url, i)
                    print(page_url)
                    while True:
                        r = requests.get(page_url, headers=headers)
                        # 访问api的次数用尽则休眠10分钟
                        if r.status_code == 403:
                            time.sleep(60 * 10)
                        else:
                            res = r.json()
                            with open('Jsons/%d__%s.json' %(i, datestr), 'w', encoding='utf-8') as fw:
                                json.dump(res, fw, indent=4)
                            break
            break

以下是返回结果的举例,我们将其存于当前目录下一个叫做Jsons的目录里面。

{
  "total_count": 40,
  "incomplete_results": false,
  "items": [
    {
      "id": 3081286,
      "node_id": "MDEwOlJlcG9zaXRvcnkzMDgxMjg2",
      "name": "Tetris",
      "full_name": "dtrupenn/Tetris",
      "owner": {
        "login": "dtrupenn",
        "id": 872147,
        "node_id": "MDQ6VXNlcjg3MjE0Nw==",
        "avatar_url": "https://secure.gravatar.com/avatar/e7956084e75f239de85d3a31bc172ace?d=https://a248.e.akamai.net/assets.github.com%2Fimages%2Fgravatars%2Fgravatar-user-420.png",
        "gravatar_id": "",
        "url": "https://api.github.com/users/dtrupenn",
        "received_events_url": "https://api.github.com/users/dtrupenn/received_events",
        "type": "User"
      },
      "private": false,
      "html_url": "https://github.com/dtrupenn/Tetris",
      "description": "A C implementation of Tetris using Pennsim through LC4",
      "fork": false,
      "url": "https://api.github.com/repos/dtrupenn/Tetris",
      "created_at": "2012-01-01T00:31:50Z",
      "updated_at": "2013-01-05T17:58:47Z",
      "pushed_at": "2012-01-01T00:37:02Z",
      "homepage": "",
      "size": 524,
      "stargazers_count": 1,
      "watchers_count": 1,
      "language": "Assembly",
      "forks_count": 0,
      "open_issues_count": 0,
      "master_branch": "master",
      "default_branch": "master",
      "score": 10.309712
    }
  ]
}

返回结果中的 full_name 和 url 是我们需要获取的信息。下面是下载2009年到2016年所有符合条件的项目的信息的代码:

import datetime

def get_download_url():
    """
    获取github上符合条件的版本库的信息,包括下载地址等
    :return:
    """
    time_delta = datetime.timedelta(days=1)
    start_date = datetime.date(2009,1,1)
    end_date = datetime.date(2016,12,31)
    d = start_date
    while d <= end_date:
        str_time = d.strftime(format='%Y-%m-%d')
        get_download_url_of_oneday(str_time)
        print('%s download finished.' %str_time)
        d += time_delta

将所有项目信息整合到一个json文件里面

下面我们需要将所有项目的名称和下载地址整合到一个json文件里面,代码如下:

import json
import os

def analyze_repositories_json(path):
    """
    解析使用 get_download_url 函数获取到的json,得到下载地址和版本库名字
    :return:下载地址和版本库名字
    """
    res = []
    with open(path, 'r') as fr:
        j = json.load(fr, encoding='utf-8')
        try:
            for item in j['items']:
                res.append((item['url'] ,item['full_name'].replace('/', '__')))
        except:
            pass
    return res

def analyze_repositories_jsons():
    res = []
    file_list = os.listdir('Jsons')
    file_count = len(file_list)
    for i,file in enumerate(file_list):
        path = os.path.join('Jsons', file)
        res.extend(analyze_repositories_json(path))
        print('Finished update from: %s, %d / %d' %(file, i, file_count))
    with open(os.path.join('RepoJson', 'all_repo.json'), 'w') as fw:
        json.dump(res, fw, indent=4)

其中第一个函数用于分析前面生成的某一个json文件,第二个函数用于将所有项目的信息整合到一个json数组里面并且存储到当前目录下的RepoJson文件夹里面。

下载所有项目的zip文件到本地

我们使用Python的requests库访问前面得到的所有下载地址并将文件下载到本地:

import requests
from zipfile import is_zipfile
import os

def download_repo(url, name):
    '''
    下载一个版本库的zip文件到本地
    :param url:下载地址
    :param name 版本库名字
    :return:
    '''
    save_dir = r"E:\ZipRepos"
    file_name = save_dir + os.sep + name.replace("/","+") + ".zip"
    if is_zipfile(file_name):
        print("%s has existed." %name)
    else:
        while True:
            r = requests.get('%s/zipball' %url, stream=True)
            if r.status_code == 403:
                print("Rate limit, sleep 20 minute.")
                time.sleep(60 * 20)
            else:
                # 过滤大于10M的文件
                if int(r.headers.get('Content-Length', 0)) > 1024 * 1024 * 10:
                    print("Filter %s because its size greater than 10M." %name)
                    break
                f = open(file_name, 'wb')
                for chunk in r.iter_content(chunk_size=1024):
                    if chunk:
                        f.write(chunk)
                f.flush()
                f.close()
                print(r'Download from %s and write to %s\%s done.' % (url, save_dir, name))
                break


def download_repos(url_and_name, start_index, end_index):
    """
    批量下载版本库
    :param url_and_name: 下载地址和版本库名字对应的字典
    :param start_index: 从第几个文件开始下载
    :param end_index: 到第几个文件结束
    :return:
    """
    file_count = len(url_and_name)
    for i,url_name in enumerate(url_and_name[start_index:end_index]):
        download_repo(url_name[0], url_name[1])
        print("The %d / %d zip file has been downloaded." %(i + start_index, file_count))
    print('Download finished.')

第一个函数用于下载一个给定地址的文件,第二个函数用于下载所有的文件。这里我把文件的保存路径设为了E盘下的ZipRepos文件夹,添加了对zip文件有效性的检查方便断点续传,添加了 start_index 和 end_index 参数方便指定下载某个区间的文件。下面我们就可以调用以上函数来开始下载了:

import os

if __name__ == '__main__':
    with open(os.path.join('RepoJson', 'all_repo.json'), 'r') as fr:
        url_and_name = json.load(fr)
        download_repos(url_and_name, start_index = 0, end_index=len(url_and_name))

使用多进程加多线程加快下载速度

由于总共的项目数超过了四十万,如梭使用单线程下载的话可能需要超过一个月的时间,因为我使用了多进程加上多线程的方式来加速下载:

import threading
from DataAcquisition import download_repos
import json
from multiprocessing import Pool
import os

def one_process(url_and_name, start, end):

    # 开启5个线程下载
    indexs = list(range(start, end, 8000))
    if indexs[-1] != end:
        indexs.append(end)
    for i in range(len(indexs) - 1):
        t = threading.Thread(target=download_repos,args=(url_and_name, indexs[i], indexs[i+1]),name=str(i))
        t.start()

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(8)
    with open(os.path.join('RepoJson', 'all_repo.json'), 'r') as fr:
        url_and_name = json.load(fr)
        indexs = [0, 40000, 80000, 120000, 160000, 200000, 240000, 280000, 320000, 360000, len(url_and_name)]
        for i in range(10):
            p.apply_async(one_process, args=(url_and_name, indexs[i], indexs[i+1]))
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')

这里我开启了十个进程,每个进程使用五个线程,因此平均下来每个线程大概需要下载8000个文件。如果不断电且网速够快的话,40万个项目几天应该能够下完。

项目地址

https://github.com/1033020837/GithubTest