えんじにあのじゆうちょう

勉強したことを中心にアウトプットしていきます。

【AlibabaCloud】LogServiceのML機能をJupyterNotebookから使ってみた

はじめに

alibabacloud Advent Calendar 2019 - Qiita 18日目ということで、LogServiceというリアルタイムデータ収集&可視化&処理プロダクトの中でも、たぶんあまり使われていないだろうJupyter Notebookのextensionを利用して、LogServiceを操作する方法について解説します。

LogServiceについて

LogServiceの概要

LogServiceはリアルタイムデータを収集、可視化、分析するプロダクトです。
似ているもので言うとElasticSearch+Kibanaや、私は使ったことがないですが、Splunkも似ているそうです。
マネージド型サービスであるため、データを取り込み分析をするところだけを気にすればよいという点もメリットかなと思います。
www.alibabacloud.com

LogServiceの使い方については、少し古いですが1年ほど前に仕事中に書いたものが参考になると思うので、ご興味のある方は是非参考にしてください。
(1年経つとクラウドプロダクトは別物になっていることも多いので、あくまで参考としてお取り扱いください)

www.slideshare.net

あと、本ブログは完全に趣味の世界であることはご理解ください。

LogServiceの機械学習機能

さて、本記事では上記のSlideShare等でも書いていない機械学習の部分について書きたいと思います。

まず、LogServiceで利用できる機械学習機能ですが、以下に一覧があります。
www.alibabacloud.com

色々なものがありますが、今回はPrediction and anomaly detection functionを試していこうと思います。
www.alibabacloud.com

ある程度触ってみた結果、1つ触り方がわかるとだいたい同じ感じでいけるかな、という感覚でした。

やってみよう

データの用意

まずは時系列っぽいデータを用意します。また、以前*1やったようにオープンデータを用意してもいいのですが、今回は自分で作成することにします。
このあたりはNotebook上で実行しても、生のPythonで実行してもどちらでも大丈夫です。

import numpy as np
def f(x_t1, x_t2):
    return 1.3 * x_t1 - 0.4 * x_t2  + np.random.normal(loc=0, scale=1)

N = 100
np.random.seed(5)
series = np.zeros((N,))
series[0] = np.random.normal(1, 1)
series[1] = np.random.normal(2.0, 1)
for i in range(2, N):
    series[i] = f(series[i-1],series[i-2])

f:id:marufeuillex:20191207124426p:plain

こんな感じのデータができます。

ではこれをアップロードしてみましょう。
今回はデータ量も多くないので、API経由でアップロードします。

以下を参考にしています。
github.com

def put_logs(client, project, logstore, contents, compress=False):
    topic = ''
    source = ''
    logitemList = []  # LogItem list
    for c in contents:
        logItem = LogItem()
        logItem.set_time(int(time.time()))
        logItem.set_contents(c)
        logitemList.append(logItem)
    request = PutLogsRequest(project, logstore, topic, source, logitemList, compress=compress)

    response = client.put_logs(request)
    response.log_print()

endpoint = 'ap-northeast-1.log.aliyuncs.com'
accessKeyId = 'YOUR_ACCESS_KEY'
accessKey = 'YOUR_ACCESSKEY_SECRET'
logstore = 'YOUR_LOGSTORE_NAME'
project = 'YOUR_PROJECT_NAMEE'
client = LogClient(endpoint, accessKeyId, accessKey)

contents = []

for i in range(N):
    contents.append([("timestamp", str(i)), ("value", str(series[i]))])
put_logs(client, project, logstore, contents)

timestampは1秒刻みにしています。
元データやアップロード後のデータ型を文字列型にしたい場合であっても、アップロード前は一旦文字列型に変換するというのがちょっとしたポイントです。
今回でいうとtimestampは整数、valueは浮動小数点ですが、どちらも文字列型に変換しています。

ちなみに、データが大量のときはOSSにアップロードしてからDataIntegrationを使ってインポートしたりするほうが早いかもしれません。
容量や用途に応じて、適切な方法をご選択してください。

Jupyter Notebookの設定

設定については、一応ここに少しだけ書いてあります。
github.com

以下では私の環境においてどのように導入したかを記載します。
私の環境はAnacondaの最新版(Anaconda 2019.10 for Linux Installer)を利用して構築した環境である点はご注意いただき、もし不足が出るような場合は上記のドキュメントに書いてあります。中国語ですが、コード部分くらいは読めると思います。

それでは、環境のセットアップとして以下を実行していきます。

1. 必要なライブラリのインストール
関連するパッケージはcondaでは導入できないので、pipを使います。

$ pip install aliyun-log-python-sdk odps

2. 必要な権限の設定
事前に十分な権限のついたAccessKey/AccessKeySecretを用意してください。
私の場合はAliyunLogFullAccess権限のついたRAMユーザを1つ用意し、そこからAccessKey/AccessKeySecretを発行しています。

その後、以下のようなファイルを作ります。

$ cat ~/.aliyunlogcli
[__jupyter_magic__]
access-id = YOUR_ACCESS_KEY
access-key = YOUR_ACCESS_KEY_SECRET
region-endpoint = ap-northeast-1.log.aliyuncs.com
project = YOUR_PROJECT_NAME
logstore = YOUR_LOGSTORE_NAME

これで準備は整いました!

データの取得&可視化

ここから先はNotebook上で作業してください。

まずは先程入れたデータを取り出してみましょう。

%%log  -1day ~ now
* | SELECT timestamp, value

1行目は1日前〜いままでのデータを対象とするという宣言になります。
LogServiceの場合、データが投入された時期を明示してからその範囲のデータに対して処理をかけていくという流れになります。

2行目がいつものLogServiceのクエリです。
見慣れない方のために簡単に説明すると、最初の*のところはデータのフィルタリング条件を設定します。
例えばさらにデバイス名に関する列がある時、デバイス名などで絞る、といったことが可能です。
その後ろはSQLライクなクエリです。FROMは通常は明示的に書く必要はありません。
ただ、サブクエリを使うときはFROM句の指定が必要で、一律で「FROM log」と指定します。

f:id:marufeuillex:20191207131930p:plain

もう既にお気づきだと思いますが、ちょいちょい中国語が入ってきます・・・
ちなみに私は中国語は一切読めませんが、雰囲気で乗り切りました。
「变量名」はきっとLogServiceからの戻りの結果がlog_dfという名前のオブジェクトに格納されている、ということなんでしょう。

f:id:marufeuillex:20191207132440p:plain

やっぱりそのようですね!!

とりあえず別の変数に代入しておきます。

base_log = log_df

念の為、dtypesを見ると、すべてのカラムがobject型にされてしまっています。

base_log.dtypes

timestamp object
value object
dtype: object

これでは都合が悪いので、変換をします。

base_log["timestamp"] = base_log["timestamp"].astype("int")
base_log["value"] = base_log["value"].astype("float")

それでは、グラフを書いておきましょう。

plt.plot(base_log["timestamp"], base_log["value"])

f:id:marufeuillex:20191207132921p:plain

機械学習1: 単純な異常検知

それでは機械学習関数について試していきましょう。
LogServiceでは時系列データに対して教師なし学習モデルを提供しています。
もう少しいうと、状態空間モデルのようなものは利用できませんが、時系列モデルは利用できます。

では、とりあえず先日記事にも書いたARモデルについて試しましょう。
ARモデルについては以下を参照してください。
t.marufeuille.dev

%%log  -1day ~ now
* | SELECT ts_predicate_ar(timestamp, value, 2)

f:id:marufeuillex:20191207140314p:plain

簡単にARモデルが構築できました。
結果のデータフレームの各列は概ね以下のようなものになります。

# カラム名 大まかな意味
1 anomaly_prob アノマリの確率(0 or 1)
2 lower 予測値の下側範囲
3 predict 予測値(中央)
4 src 実データ
5 unixtime タイムスタンプ(秒)
6 upper 予測値の上側範囲

図だとわかりにくいので、お絵かきしてみましょう。

preds = log_df
# 型変換
preds["unixtime"] = preds["unixtime"].apply(lambda x: int(float(x)))
preds["anomaly_prob"] = preds["anomaly_prob"].astype("float")
preds["lower"] = preds["lower"].astype("float")
preds["predict"] = preds["predict"].astype("float")
preds["src"] = preds["src"].astype("float")
preds["upper"] = preds["upper"].astype("float")
# アノマリのindex一覧作成
anomaly_idx = np.where(preds["anomaly_prob"] == 1)[0]
# 可視化
plt.subplots(figsize=(15,10))
plt.plot(preds["unixtime"], preds["src"], color="gray", label="actual")
plt.plot(preds["unixtime"], preds["predict"], color="orange", label="preds(mean)")
plt.plot(preds["unixtime"], preds["upper"], color="red", linestyle="dashed", label="preds(upper)")
plt.plot(preds["unixtime"], preds["lower"], color="cyan", linestyle="dashed", label="preds(upper)")
plt.scatter(anomaly_idx, preds["src"][anomaly_idx], marker="x", color="red", s=100, label="anomaly")
plt.legend()

f:id:marufeuillex:20191207225353p:plain

とても簡単にモデル化&異常検知ができました。
それにしても、なんでここがアノマリなんだろう?というのはありますが、アノマリ検知は基本的にモデルを作成→モデルと実測値を比べてある一定の閾値を超えたらアノマリ扱いということになるので、きっとそういう判断があったのでしょう。

機械学習2: 未来予測

ts_predicate_**関数群はnPredsというパラメータがあるため、ここに数値を渡すといい感じに以降の予測を出してくれます。
のはずなんですが、普通にやるとうまくいきません。

例えば、先程のAR(2)モデルを利用して以降10個の予測をしたいときに、単純に考えると以下の様にやれば良い気がするのですが、これでは100個目までのデータしかでてきません。

%%log  -1day ~ now
* | SELECT ts_predicate_ar(timestamp, value, 2, 10) 

f:id:marufeuillex:20191207163606p:plain

実は、明示されていませんがLIMIT 100が暗黙的にかかっているようで、予測を含め、100個以上データがあるときはうまくいきません。
今回、データ数は100個で追加で予測したい個数が10個なのでLIMIT 110とします。

%%log  -1day ~ now
* | SELECT ts_predicate_ar(timestamp, value, 2, 10) LIMIT 110

f:id:marufeuillex:20191207163620p:plain

f:id:marufeuillex:20191207225249p:plain

なんかいい感じに予測できてますね。
LogServiceの異常検知は構築したモデルによる予測値を元にした異常検知と実際の値の差分で求めるため、未来データに対してここで異常が来るだろう、といったことが言えるわけではない点に注意してください。

おわりに

今回はAlibabaCloudのLogServiceをJupyterNotebookから使ってみました。
公式SDKをインストールするだけで簡単に利用できるという手軽さについてはご理解いただけたのではないかと思います。
また、今回は本当にごくごく一部の機能をしか使っていませんが、他にもたくさんの機能があります。
マネージドサービスかつ課金も従量なので興味を持っていただいたら、ぜひ試していただけると嬉しいです!