Spark

PySparkでのk-meanクラスタリング

関係記事:クラスター数の決め方の1つシルエット分析、 k-means++ ビッグデータ処理や機械学習の場合は、巨大データの取り扱いを目的とした分散処理のフレームワークが必要です。特定のアプリケーションに関する実行性能はSpark MLです。今回の記事はSpark MLでk-meanのクラスタリングを解説します。 目次 1. PySparkのクラスタリング 2. 実験・コード __2.1 ライブラリーのインポート __2.2 データ処理 __2.3. シルエットスコアの比較 __2.4. クラスタリングのモデルを作成 __2.5. 可視化 1. Spark MLのk-meanクラスタリング Spark MLはSparkの統計処理、機械学習を分散処理するライブラリです。k-meanはは最も一般的に使われる、事前に定義したクラスタ数までデータを群にする、クラスタリング アルゴリズムです。 spark.mlでのパラメータ: – k は要求するクラスタの数です。 – maxIterations は実行の繰り返しの最大数です。 – initializationMode はランダム初期化 – initializationSteps は k-meansアルゴリズム内でのステップ数を決定します。 – epsilon はk-meansが収束したと見なす距離の閾値を決定します。 – initialModel は初期化に使用されるクラスタの中心点の任意のセットです。 2. 実験・コード 概要 データセット: UCI機械学習リポジトリの白ワインの属性 環境: Databricks Runtime Version: 6.0 ML (includes …

PySparkでのk-meanクラスタリング Read More »

PySparkでの相関行列と可視化(ヒートマップ表)

PySparkのデータ処理一覧 データの2つの系列間の相関関係は統計では一般的な操作になります。今回の記事はPySparkで相関行列行います。PythonのPandasとSpark MLで相関行列を計算してSeabornでヒートマップ表を作成するやり方を比較します。 目次 1.環境とライブラリ(Spark ML) 2.相関行列とは 3.実験のコード 3.1 データセットのロード 3.2 Pandasの相関行列 3.3 ヒートマップ表 3.4 Spark MLの相関行列 3.5ヒートマップ表 4. まとめ 環境 Databricks: Runtime: 5.5 LTS ML (includes Apache Spark 2.4.3, Scala 2.11) 5.5 LTS MLはSpark MLのライブラリがあります。 Spark ML Sparkの統計処理、機械学習を分散処理するライブラリです。spark.mllibとspark.mlの二つのパッケージがあります。SparkのMLlibはMLに移行しつつあります。Spark2.0からはRDDベースのMLlib APIは保守のみになり、今後はDataFrameベースのAPIが標準になるそうです。 ここではPySparkでML APIを使い、相関行列を行います。 2.相関行列とは 相関係数とは、2つのデータの(直線的な)関係性の強さを −1 から +1 の間の値で表した数のこと。相関行列とは、相関係数を並べたものであり、その意味から対称行列になります。 相関係数の計算式 xと yの相関係数 rは次の式で求まる。 ここで、sxy はxとyの共分散 sx は xの標準偏差 sy …

PySparkでの相関行列と可視化(ヒートマップ表) Read More »

PySparkでのデータ結合

PySparkのデータ処理一覧 データを分析する上で、通常は複数のDataFrameを組み合わせすることが必要です。今回は【PySparkでのデータ結合】を説明します。 Union 同じ列を持つDataFrame同士を結合する方法です。 PySparkのデータ結合 unionallでspf1とspf2を結合し、sdf3を作成します。 # PySpark sdf1 = spark.createDataFrame([(‘a1’, 10),(‘a2’, 20),(‘a3’, 30)],[“c1”, “c2”]) sdf2 = spark.createDataFrame([(‘a1’, 100),(‘a2’, 200)],[“c1”, “c2”]) sdf3 = sdf1.unionAll(sdf2) pandasのappend 2つのDataFrame pdf1、pdf2をUnionで結合し、pdf3を作成します。ignore_indexにはTrueを指定して、新たにindexを振り直します。 # Python Pandas import pandas as pd pdf1 = pd.DataFrame([[‘a1’, 10],[‘a2’, 20],[‘a3’, 30]], columns=[“c1”, “c2”]) pdf2 = pd.DataFrame([[‘a1’, 100],[‘a2’, 200]], columns=[“c1”, “c2”]) pdf3 = pdf1.append(pdf2, ignore_index=True) pandasのconcat 2つのDataFrame …

PySparkでのデータ結合 Read More »

PySparkでデータ読み込み

PySparkのデータ処理一覧 今回はdatabricksでtableとcsvと parquetと圧縮ファイルの gzファイルを読み込むコードの例を紹介します。 1. tableからデータ読み込む spark.table と spark.sqlでデータを読み込みます。   Pyspark df = spark.table(“list”) df.show() +—+—-+—+| id|name|age|+—+—-+—+|  1|Andy| 20||  2|Jack| 31||  3| Tom| 41|+—+—-+—+ Pyspark df = spark.sql(“select * from list”) df.show() +—+—-+—+| id|name|age|+—+—-+—+|  1|Andy| 20||  2|Jack| 31||  3| Tom| 41|+—+—-+—+   2. csvファイルを読み込む spark.readでcsvファイルを読み込みます。   Pyspark 例1 df = (spark.read      .option(“inferSchema”, “True”)      …

PySparkでデータ読み込み Read More »

PySparkで欠損値(Null)の取り扱い方法

PySparkのデータ処理一覧   データ分析でよく問題になるのが欠損値の処理です。今回の記事はPySparkで欠損値(Null)の取り扱い方法を紹介します。 先ず、欠損値があるデータを作成します。 Pyspark df = spark.createDataFrame([   (“p001”, 1020, None),   (“p002”, 560, “delivered”),   (“p003”, None, “delivered”),   (“p004”, None, None)],   [“productID”, “unit”, “status”]) df.show() df.count()   +———+—-+———+|productID|unit|   status|+———+—-+———+|     p001|1020|     null||     p002| 560|delivered||     p003|null|delivered||     p004|null|     null|+———+—-+———+Out[1]: 4   1)欠損値の件数 isNull isNotNillで欠損値がない列をフィルタして数えます。 PySpark df2 = df.filter((df[“productID”].isNotNull() & df[“unit”].isNotNull() & df[“status”].isNotNull()))df2.show()df2.count() +———+—-+———+ |productID|unit| …

PySparkで欠損値(Null)の取り扱い方法 Read More »

PySparkでDataFrameに行を追加する方法

PySparkのデータ処理一覧 今回はDataFrameに行を追加する方法を説明します。前回と同じPython とPysparkで比較して色んな例を作成します。 1)DataFrameを作成します。 Python import pandas as pd import numpy as  np pdf = pd.DataFrame(data={‘ColumnA’:np.linspace(1, 3, 3),                         ‘ColumnB’:[‘red’, ‘yellow’,’green’],                         ‘ColumnC’: np.array(1) }) pdf Pyspark data = [(1, ‘red’, 1), (2, ‘yellow’, 1), (3, ‘green’, 1)] sdf = sqlContext.createDataFrame(data, [“ColumnA”, “ColumnB”, “ColumnC”]) display(sdf) Out[1]:    ColumnA ColumnB  ColumnC 0      1.0     red        1 1      …

PySparkでDataFrameに行を追加する方法 Read More »

PySparkでDataFrameに列を追加する方法

PySparkのデータ処理一覧 大量データ処理するとき、高速でスケーラブルな汎用分散処理エンジンのSparkが、よく使われます。 PySparkはSparkを実行するためのPython APIです。今回は PySparkでDataFrameに列を追加する方法を説明します。PythonとPySparkで比較して色んな例を作成します。 1)DataFrame作成します。 Python import pandas as pd import numpy as np df = pd.DataFrame(data=np.array(range(5)), columns=[‘number’]) df       PySpark sdf = spark.range(5).toDF(“number”) display(sdf)   Out[1]:    number 0       0 1       1 2       2 3       3 4       4   2)DataFrameに値の列を追加します。 WithColumn関数を用いて新しく追加します。またlit関数を用いて、複数の値にしています。 new_column1 = 10 Python df[‘new_column1’] = 10 df     PySpark …

PySparkでDataFrameに列を追加する方法 Read More »