[Python]TwitterAPIを使用しtweet内容をMySQLに随時Insertする

趣味で作っているアプリのために
特定のツイートをDataBase(MySQL)に常時Insertし続けるプログラムが必要になり
Raspberry Piで実行可能なPythonでコードを書き、サービスに登録するまでの手順を記録として書いていきます。

・環境
 Raspberry Pi 3B+(HDDのみでの運用)
  OS = Raspbian 10.4
 Windows10
 VS Code
 Python3.7.7

目次
Pythonコード
Raspbianにサービス登録

・Pythonコード
TwitterAPIを使用してツイートを取得し、
正規表現でツイート内容をカラム毎に分割し、Insert・Commitをループで行います。

import json
import re
import subprocess
import sys
import time
import datetime
from requests_oauthlib import OAuth1Session
import MySQLdb
import os
#Twitter_API_Key
CK = 'Consumer Key'
CS = 'Consumer Key Secret'
AT = 'Access Token'
AS = 'Access Token Secret'
#MySQL_Connect
MySQL_USER = "Administrator"
MySQL_PASS = "password"
MySQL_HOST = "xxx.xxx.xxx.xxx" #localhost
MySQL_PORT = 12345
MySQL_DB = "DataBase"
MySQL_InsertText = "insert into Live_Tweet(ID, Sansen_ID, Enemy_LV, Enemy_NAME, Tweet_ID, Tweet_DATE) "
MySQL_from_TableName = " from Live_Tweet"
Not_EnemyLV = "???"
FILTER_URL = 'https://stream.twitter.com/1.1/statuses/filter.json'
#LogFile関連
Log_FILE_Pass = os.path.join(os.path.dirname(__file__), "All_LOG.log")
Log_FILE_Pass_check = os.path.isfile(Log_FILE_Pass)
Log_Text = ""
def Log_FILE_check():
    if None or not Log_FILE_Pass_check:
        with open(Log_FILE_Pass, mode="w") as f:
            f.write(Log_Text)
#MySQL接続
def MySQL_Connect():
    return MySQLdb.connect(
        unix_socket = '/Applications/MAMP/tmp/mysql/mysql.sock',
        user = MySQL_USER,
        passwd = MySQL_PASS,
        host = MySQL_HOST,
        port = MySQL_PORT,
        db = MySQL_DB,
        charset='utf8',
        use_unicode=True)
#文字列から参戦IDを抽出
def parse(string):
    pattern = r'[0-9A-F]{8}\s:参戦ID'
    matchOB = re.findall(pattern, string)
    if matchOB:
        return matchOB[-1][0:8] 
    else:
        return None
#文字列から敵情報を抽出
def parse_sub(string):
    pattern = r'Lv[0-9\?]{2,3}\s[\D]{2,20}\n' 
    matchOB = re.findall(pattern, string) 
    if matchOB:
        return matchOB[-1] 
    else:
        return None
#文字列から敵情報を抽出2
def parse_sub2(string):
    pattern = r'参加者募集!\n[\D]{0,5}[\D]{2,20}\n' 
    matchOB = re.findall(pattern, string) 
    if matchOB:
        return matchOB[-1] 
    else:
        return None
#文字列からLVを抽出(ないものもある)
def parse_LV(string):
    pattern = r'Lv[0-9\?]{2,3}\s'
    matchOB = re.findall(pattern, string)
    index = len(matchOB[-1])
    return string[2:index - 1]
#文字列から名前を抽出
def parse_NAME(string):
    pattern = r'Lv[0-9\?]{2,3}\s'
    matchOB = re.findall(pattern, string)
    index = len(matchOB[-1])
    index_sub = string.find('\n', 1)
    return string[index:index_sub]
#文字列から名前を抽出2
def parse_NAME_sub(string):
    pattern = r'\n[\D]{0,5}[\D]{2,20}\n'
    matchOB = re.findall(pattern, string)
    string = str(matchOB[-1])
    return string.replace('\n', '')
#tweetをDBに登録するInsert_SQL
def Insert_MySQL(tweet, raid_id, enemy_lv, enemy_name):
    tm = time.strftime('%Y-%m-%d %H:%M:%S')
    tw_id = tweet.get('user').get('screen_name')
    sql_1 = " select COALESCE(max(ID)+1, 1), "
    sql_2 = " '" + str(raid_id) + "', "
    sql_3 = " '" + str(enemy_lv) + "', "
    sql_4 = " '" + str(enemy_name) + "', "
    sql_5 = " '" + str(tw_id) + "', "
    sql_6 = " '" + tm + "' "
    return MySQL_InsertText + sql_1 + sql_2 + sql_3 + sql_4 + sql_5 + sql_6 + MySQL_from_TableName
#Insertテーブルを全削除
def TRUNCATE_MySQL():
    return "truncate table Live_Tweet"
#現在の最大行数を取得
def Select_Row_Count():
    return "SELECT MAX(ID) FROM Live_Tweet"
#Log書き込み
def Log_Insert_Text(string):
    with open(Log_FILE_Pass, mode="a") as f:
        f.write("\n" + str(string))
#MainLoop処理
def main():
    try:
        #Logファイルチェック
        Log_FILE_check()
        #MySQL接続開始
        conn = MySQL_Connect()
        # カーソルを取得する
        cur = conn.cursor()
        # OAuth
        oauth_session = OAuth1Session(CK, CS, AT, AS)
        params = {'track': ':参戦ID\n参加者募集!'}
        req = oauth_session.post(FILTER_URL, params=params, stream=True)
        #現在時刻取得
        DateTime_NOW = datetime.datetime.now()
        Delete_DateTime = DateTime_NOW + datetime.timedelta(minutes = 30)
        for line in req.iter_lines():
            #30分ごとにテーブル内データを削除
            if datetime.datetime.now() > Delete_DateTime:
                cur.execute(Select_Row_Count())
                rows = cur.fetchall()
                cur.execute(TRUNCATE_MySQL())
                DateTime_NOW = datetime.datetime.now()
                Delete_DateTime = DateTime_NOW + datetime.timedelta(minutes = 30)
                Log_Insert_Text("「" + str(DateTime_NOW) + "」 30分ごとのテーブル再作成実行終了・ Rows → " + str(rows[0]))
            line_decode = line.decode('utf-8')
            if line_decode != '': # if not empty
                tweet = json.loads(line_decode)
                # pass tweets via the game page
                if tweet.get('source') == '<a href="http://granbluefantasy.jp/">グランブルー ファンタジー</a>':
                    raid_id = parse(tweet.get('text'))
                    if raid_id:
                        enemy_setting = parse_sub(tweet.get('text'))
                        if enemy_setting:
                            enemy_lv = parse_LV(enemy_setting)
                            enemy_name = parse_NAME(enemy_setting)
                            cur.execute(Insert_MySQL(tweet, raid_id, enemy_lv, enemy_name))
                            conn.commit()
                        else:
                            enemy_setting = parse_sub2(tweet.get('text'))
                            enemy_lv = Not_EnemyLV
                            enemy_name = parse_NAME_sub(enemy_setting)
                            cur.execute(Insert_MySQL(tweet, raid_id, enemy_lv, enemy_name))
                            conn.commit()
    except Exception as ex:
        conn.close()
        Log_Insert_Text(ex)
        sys.exit()
if __name__ == "__main__":
    main()

まず、「params = {'track': ':参戦ID\n参加者募集!'}」で検索パラメータとして2つ目の引数が含まれるツイートをすべて取得するようにしています。

そしてメインとなるループに入り、条件に一致するツイートを取得するたびに
取得した内容を正規表現で分割していきMySQLにInsertします。
VB.netでMySQLにInsertする場合は自動でコミットしてくれますがPythonでは毎回コミット処理を記述します
(本来パフォーマンスの悪化を防ぐためにSQLの発行回数やコミット回数は極力少ないほうがいいのですが処理が処理なので・・・)

Insert文ですがシーケンスは使用せずにプライマルキーである「ID」に値をセットする際、
select COALESCE(max(ID)+1, 1)」でマックス値+1を入れています。(Row(行)がない場合は1を代入)

実際に稼働させるOSはRaspbianですがとりあえず1時間ほどwindows10でテストし、
ログの書き出しや例外時にもログに書き込まれるか確認が終わったらRaspbianの操作に入ります。

・Raspbianにサービス登録
1.ファイルのコピー(コピペ新規作成)
2.サービスファイルの作成
3.サービスの開始と確認

スクリプトファイルを置くディレクトリは参考にした記事(リンク→Qiita)のコメント欄にあるように「/opt」に配置します

sudo mkdir /opt/Python

今回は上記ディレクトリにフォルダを作りその中に[ search.py ]をコピーまたは作成します。

cd /opt/Python
sudo nano search.py
-- コードをコピペし保存 --

動作確認をしたい場合は

Python3 search.py

と、コマンドをたたけば実行されます。(終了したい場合はSSHで接続しているTera Term等を終了すると止まります。)

次にサービスファイルを作成します。

sudo nano /etc/systemd/system/gbf_tweet_insert.service
- 上記ファイルに記述する内容は下記文です --
[Unit]
Description=do something
[Service]
ExecStart=/usr/bin/python /opt/Python/search.py
[Install]
WantedBy=multi-user.target

ExecStart=/usr/bin/python ここはPythonの絶対パスを入力(デフォルトはPython2が使用されるようになっているので注意)
/opt/Python/search.py 自動起動する対象のPythonファイルの絶対パスを入力
サービスファイルの作成が終了したら有効化します。

sudo systemctl enable gbf_tweet_insert.service
または
sudo systemctl enable gbf_tweet_insert

-- スタート --
sudo systemctl start gbf_tweet_insert
-- ストップ --
sudo systemctl stop gbf_tweet_insert
-- 確認 --
sudo systemctl status gbf_tweet_insert

スタートできなかった場合などに確認(status)を実行すると前回どの部分のコードで落ちたかが確認できます。
(pip installが終わっていなかった場合は import 対象モジュール が表示されたりします)

-- 停止時のステータス --
@raspberrypi:/opt/Python $ sudo systemctl stop gbf_tweet_insert
@raspberrypi:/opt/Python $ sudo systemctl status gbf_tweet_insert
● gbf_tweet_insert.service - do something
Loaded: loaded (/etc/systemd/system/gbf_tweet_insert.service; enabled; vendor preset: enabled)
Active: inactive (dead) since Sat 2020-10-17 22:03:03 JST; 5s ago
Process: 9024 ExecStart=/usr/bin/python /opt/Python/search.py (code=killed, signal=TERM)
Main PID: 9024 (code=killed, signal=TERM)Oct 17 16:13:00 raspberrypi systemd[1]: Started do something.
Oct 17 22:03:03 raspberrypi systemd[1]: Stopping do something...
Oct 17 22:03:03 raspberrypi systemd[1]: gbf_tweet_insert.service: Main process exited, code=killed, status=15/TERM
Oct 17 22:03:03 raspberrypi systemd[1]: gbf_tweet_insert.service: Succeeded.
Oct 17 22:03:03 raspberrypi systemd[1]: Stopped do something.

-- 稼働時のステータス --
● gbf_tweet_insert.service - do something
Loaded: loaded (/etc/systemd/system/gbf_tweet_insert.service; enabled; vendor preset: enabled)
Active: active (running) since Sat 2020-10-17 22:07:49 JST; 2h 14min ago
Main PID: 10310 (python)
Tasks: 1 (limit: 2068)
CGroup: /system.slice/gbf_tweet_insert.service
mq10310 /usr/bin/python /opt/Python/search.py
Oct 17 22:07:49 raspberrypi systemd[1]: Started do something.

stopコマンドでPythonの実行を停止できます。
threadを使用してちゃんとタスクを殺せていないソースだと止まらないので注意してください。

・最後に
パフォーマンス的に不安はあったのですが
3B+でもパフォーマンス低下せずにこのブログも表示できているので満足です。
そのうち4の4GB or 8GBに乗せ換えたいですね。

 

コメント

タイトルとURLをコピーしました