このページは機械翻訳したものです。

Python Sparkストリーミング・アプリケーションを構築するためのベスト・プラクティス

データ・フローでPythonビルド・プロジェクトを構成するためのヒントとベスト・プラクティス。

Python Sparkアプリケーションでは、JVMランタイムとPythonランタイムの両方に対して同時に依存関係が必要であるため、Javaアプリケーションよりも複雑です。ランタイムには、それぞれ独自のプロジェクトおよびパッケージ管理システムがあります。別のランタイムからの依存関係のパッケージ化を支援するために、データ・フローには依存関係パッケガ・ツールがあります。このツールでは、すべての依存関係が1つのアーカイブにパッケージ化され、このアーカイブをオブジェクト・ストレージにアップロードする必要があります。データ・フローは、このアーカイブからSparkアプリケーションへの依存関係を提供します。

このアーカイブは、Oracle Cloud Infrastructure Object Storageに格納されると、同じ再現性(アーティファクトは動的であるため、異なる依存性ツリーが生成される可能性がある)が保証され、外部ソースからの同じ依存性のダウンロードが停止します。

依存性マネージャの設定および使用方法の詳細は、「データ・フローでのSpark-Submit機能」の項を参照してください。

アプリケーションのarchive.zipを作成する場合、packages.txtに必要なJavaライブラリがリストされ、依存性パッケージャによって、依存関係とともにパッケージ化されます。
次のいずれかのオプションを使用して依存関係を指定します。
  • --packagesオプションまたはspark.jars.packages Spark構成を使用します。プライベート・エンドポイントで実行されているアプリケーションは、プライベート・サブネットからのトラフィックを、パッケージをダウンロードするためにパブリック・インターネットに移動させる必要があります。
  • オブジェクト・ストレージの場所を--jarsまたはspark.jarsにカンマ区切りリストとして指定します。
  • pythonまたはstructured_streaming_java_dependencies_for_python create archive.zipを使用します。
たとえば、spark-sql-kafka-0-10_2.12を含めるには、これをpackages.txtに追加します:
org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1
次のコマンドを実行します:
docker run --pull always --rm -v $(pwd):/opt/dataflow -it phx.ocir.io/oracle/dataflow/dependency-packager:latest
archive.zipファイルが生成されます:
  adding: java/ (stored 0%)
  adding: java/org.lz4_lz4-java-1.7.1.jar (deflated 3%)
  adding: java/org.slf4j_slf4j-api-1.7.30.jar (deflated 11%)
  adding: java/org.xerial.snappy_snappy-java-1.1.8.2.jar (deflated 1%)
  adding: java/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.0.2.jar (deflated 8%)
  adding: java/org.apache.spark_spark-sql-kafka-0-10_2.12-3.0.2.jar (deflated 5%)
  adding: java/com.github.luben_zstd-jni-1.4.4-3.jar (deflated 1%)
  adding: java/org.spark-project.spark_unused-1.0.0.jar (deflated 42%)
  adding: java/org.apache.kafka_kafka-clients-2.4.1.jar (deflated 8%)
  adding: java/org.apache.commons_commons-pool2-2.6.2.jar (deflated 9%)
  adding: version.txt (deflated 59%)
archive.zip is generated!
ノート

Spark 3.2.1では、データ・フローの実行に使用されるソース・コードおよびライブラリのほとんどは非表示です。データ・フローのSDKのバージョンと一致させる必要はありません。また、サードパーティの依存関係とデータ・フローが競合することもなくなります。詳細は、「ローカルでのOracle Cloud Infrastructure Data Flowアプリケーションの開発、クラウドへのデプロイ」チュートリアルを参照してください。必要なライブラリ・バージョンについては、Spark 3.2.1へのデータ・フローの移行を参照してください。前述のオプションを使用して、ストリーミングの依存関係を解決します。Pythonの構造化ストリーミングJava依存関係の例は、githubで入手できます。

一部のJavaライブラリのシェーディングが必要な場合があります。
Spark 2.4.4またはSpark 3.0.2を使用している場合は、ライブラリをシェーディングする必要があります。別のMavenプロジェクトを作成して、すべてのJava依存関係とその他の微調整(シェードなど)が1つの場所にあるfat JARを作成します。依存関係パッケージャを使用してカスタムJARとして含めます。たとえば、oci-java-sdk-addons-saslを使用すると、Oracle Cloud Infrastructure SDKが一部のサードパーティ・ライブラリのより新しいバージョンに対してコンパイルされるため、ランタイム障害が発生する可能性があります。

サンプルMavenプロジェクト:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
 
  <groupId>com.example</groupId>
  <artifactId>SaslFat</artifactId>
  <version>1.0-SNAPSHOT</version>
 
  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <spark.version>3.0.2</spark.version>
  </properties>
 
  <dependencies>
    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-addons-sasl</artifactId>
      <optional>false</optional>
      <version>1.36.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.11.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
  </dependencies>
 
  <build>
    <plugins>
 
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <configuration>
          <createDependencyReducedPom>false</createDependencyReducedPom>
          <transformers>
            <transformer
              implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
            <transformer
              implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
          </transformers>
          <relocations>
            <relocation>
              <pattern>com.google.</pattern>
              <shadedPattern>com.shaded.google.</shadedPattern>
            </relocation>
            <relocation>
              <pattern>com.oracle.bmc.</pattern>
              <shadedPattern>com.shaded.oracle.bmc.</shadedPattern>
              <excludes>
                <exclude>com.oracle.bmc.auth.sasl.*</exclude>
              </excludes>
            </relocation>
          </relocations>
          <!-- exclude signed Manifests -->
          <filters>
            <filter>
              <artifact>*:*</artifact>
              <excludes>
                <exclude>META-INF/*.SF</exclude>
                <exclude>META-INF/*.DSA</exclude>
                <exclude>META-INF/*.RSA</exclude>
              </excludes>
            </filter>
          </filters>
          <artifactSet>
            <excludes>
              <exclude>${project.groupId}:${project.artifactId}</exclude>
            </excludes>
          </artifactSet>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
 
    </plugins>
  </build>
</project>
依存性パッケージャの作業ディレクトリにSaslFat-1.0-SNAPSHOT.jarを配置し、次のコマンドを実行します。
docker run --pull always --rm -v $(pwd):/opt/dataflow -it phx.ocir.io/oracle/dataflow/dependency-packager:latest
SaslFat-1.0-SNAPSHOT.jarは、Java依存性としてarchive.zipにパッケージ化されます:
  adding: java/ (stored 0%)
  adding: java/SaslFat-1.0-SNAPSHOT.jar (deflated 8%)
  adding: version.txt (deflated 59%)
archive.zip is generated!
また、SaslFat-1.0-SNAPSHOT.jarが格納されているjavaフォルダを含むarchive.zipを手動で作成することもできます。