えんじにあのじゆうちょう

勉強したことを中心にアウトプットしていきます。

【読書】スケーラブルデータサイエンス 第4章

はじめに

今回も引き続き、スケーラブルデータサイエンスです。
今日は第4章。Apache Beamについて中心的にさわってみました。

まとめてみる

Apache Beam

Apache Beamはストリーミング処理、バッチ処理を統一的に扱えるプログラミングモデルです。

beam.apache.org

つまり、ApacheBeamに準じた記載で書くと、ローカル実行はもちろん、SparkやFlink上での実行、GCP Dataflowへの実行をコードの変更なく移行させることが可能です。

また、言語もJava, Pythonなどで記述可能です。
私はJavaは10年来さわっていないので、Pythonでやってみます。

また、ApacheBeamでプログラミングするときに重要な概念として、Pipeline, PCollection, PTransformがあります。
個人的にはPipeline = |, PCollection = Iterable Object, PTransform = PCollection function(PCollection)と理解していますが、正確には以下を参考にすると良いと思います。

beam.apache.org

Apache Beam Python SDK

バッチ処理は問題ないようですが、ストリーミング処理はExperimentalだそうです。

beam.apache.org

現段階では特に問題なさそうな制約に見えるので、まずは進めてみることにします。

Pub/Sub -> BQ

まずは単純にPub/Subから受け取ったデータをBQに書き込むというものを作ってみます。
コード全体はgithubにアップロードしたものを見ていただくとして、かいつまんで説明します。
(順番は説明し易い順に書きますので、必ずしもコードの並び順ではありません)

コード全体は以下をご参照ください。
github.com

パイプラインの定義

まずはパイプラインの定義です。

  pipeline_options = PipelineOptions(argv)
  pipeline_options.view_as(StandardOptions).streaming = True
  with beam.Pipeline(options=pipeline_options) as p:
    lines = p | beam.io.ReadFromPubSub(topic="projects/{}/topics/{}".format(project, input_topic))

    (lines
      | 'flights:toarr' >> beam.Map(lambda fields: fields.decode("utf-8").split(","))
      | 'flights:create_bq_row' >> beam.Map(lambda fields: create_table(fields)) 
      | beam.io.gcp.bigquery.WriteToBigQuery(
                table="{}:{}.{}".format(project, dataset, output_table),
                schema=schema
      )
    )

option(後述)を与えた上で、実際のパイプラインを書いていきます。
パイプライン上の処理は | でつなげ、その処理に名前というかラベルをつけるときは >>で指定します。
今回はReadFromPubSub()で始めていますが、ここを別のソースにすればPub/Sub以外からデータを持ってくることも可能です。

その後の処理について解説します。

 'flights:toarr' >> beam.Map(lambda fields: fields.decode("utf-8").split(","))

まずここでは、fieldsはすべてbyte列で渡ってくるのでutf-8でデコードし、結果を,で分割しています。
このタイミングでは、PTransformはlambda関数内の処理、出力されるPCollectionはカンマ分割された文字列のリストと言えそうです。

そして、

      | 'flights:create_bq_row' >> beam.Map(lambda fields: create_table(fields)) 

これはBQにinsertするときはK=列名、V=値なdictionaryオブジェクトを作る必要があるため、そのための関数(create_table)を読んでいます。
ここではPTransformはcreate_table関数、出力されるPCollectionはdictionaryオブジェクトと言えそうです。

そして最後に、

      | beam.io.gcp.bigquery.WriteToBigQuery(
                table="{}:{}.{}".format(project, dataset, output_table),
                schema=schema
      )

とすることで、BigQueryにデータを挿入しています。
ちなみに、テーブルがないときはschemaに従ってテーブルが作成されます。
ここで実装したschemaのしての仕方の場合、全てNULLABLEになってしまいますので、基本的にはそれは都合が悪いと思いますので、以下を参考にして実装すると良いかと思います。

beam.apache.org

オプションの指定

オプションは、以下のように指定します。

argv = [
      '--project={0}'.format(project),
      '--job_name=streaming-py',
      '--save_main_session',
      '--staging_location=gs://{0}/flights/streaming/staging'.format(bucket),
      '--temp_location=gs://{0}/flights/streaming/temp/'.format(bucket),
      '--max_num_workers=1',
      '--autoscaling_algorithm=THROUGHPUT_BASED',
      '--runner=DataflowRunner'
  ]
    • runnerをDirectRunnerにすれば、ローカルでの実行も可能です。
create_table関数

create_table関数は受け取ったデータのリストを受け取り、それをBQに挿入可能な形式に変換します。
データが空のものは例えNULLABLEなフィールドであっても、keyとして存在するとエラーになるためvalueが空なものは無視させます。

def create_table(fields):
  header = 'FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,DISTANCE,DEP_AIRPORT_LAT,DEP_AIRPORT_LON,DEP_AIRPORT_TZOFFSET,ARR_AIRPORT_LAT,ARR_AIRPORT_LON,ARR_AIRPORT_TZOFFSET,EVENT,NOTIFY_TIME'.split(',')

  featdict = {}
  for name, value in zip(header, fields):
    if value == '':
      continue
    featdict[name] = value
  return featdict

これでPub/Subからデータを受け取って、必要な変換を施して、BQに挿入するサンプルが実装できました!

スライディングウィンドウ

ストリーム処理といえばウィンドウ処理だと思うので(笑)本に沿ってPython版も実装してみました。
github.com

本についてくるJava版を読み替えて作っていますので、無駄にclassとか作ってます。実際にはなくてもいいと思います。

1箇所だけ、軽くハマったので書いておきます。

PerKey

1つだけハマったのがbeam.combiners.Mean.PerKey()です。Keyごとに平均を取るので、直前のMap処理の戻り値を{elm.airport: elm.delay}にしたらだめでした。基本的にBeamでK, Vと言われたらTupleで書くようです。

  delay = (
    flights
    | beam.Map(lambda elm: (elm.airport, elm.delay))
    | beam.combiners.Mean.PerKey()
    | beam.Map(lambda elm: print(elm[0].name, elm[1]))
  )

感想

今回はApacheBeamのpython sdkを利用してstreaming処理及びSlidingWindowを実装してみました。
このくらいの処理であれば、制限にも引っかからず問題なくPythonで実装できることがわかりました。
個人的にはPythonの見栄えのほうが好きなので非常にスッキリかけてよかったという気持ちもあります。

Apache Beamは非常に書きやすく他への移植性も高いため、今後はこれをしっかり使いこなせるように精進します。