Apache Flumeの使用
Apache Flumeは、大量のストリーミング・イベント・データを効率的に収集および移動します。
Apache Flumeの詳細は、Flumeのドキュメントを参照してください。
Flume構成プロパティ
ビッグ・データ・サービス3.1.1以降に含まれるFlume構成プロパティ。
| 設定 | プロパティ | 説明 |
|---|---|---|
flume-env
|
flume_java_opts
|
Flink Hadoopクラスパス |
flume_user_classpath
|
Flumeユーザー・クラスパス |
Apache Flumeソース、チャネルおよびシンク構成
Flumeイベントは、バイト・ペイロードと文字列属性のオプション・セットを持つデータ・フローの単位です。Flumeエージェントは、イベントが外部ソースから次の宛先(ホップ)に流れるコンポーネントをホストする(JVM)プロセスです。
Flumeソースは、読取り可能な形式のIOTデバイスからFlumeソースへのイベントなど、外部ソースからのイベントを消費するように設計されています。構成されたFlumeソースごとの形式は、Avro、JSON、プレーン・テキストなどです。このデータは、Flumeシンクによって受信されます。
Flumeソースはイベントを受信すると、チャネルに格納されます。最も一般的に使用されるチャネルは、メモリー・チャネル、ファイル・チャネルおよびKafkaチャネルです。チャネルは、シンクによって読み取られるまでデータを保持します。
Flumeシンクは、チャネルからデータを削除し、それを別のFlumeソースまたは外部ストレージ(HDFSやダウンストリーム・プロセスが消費するオブジェクト・ストレージなど)に転送します。
次に、ソース、チャネルおよびシンク構成の例を示します。
HDFSシンクを使用したNetcatソースの例
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
NetcatソースおよびHBaseシンクの例
agent1.sources = netcat1
agent1.sinks = hbase
agent1.channels = Qmemory1
agent1.sources.netcat1.type = netcat
agent1.sources.netcat1.bind = 0.0.0.0
agent1.sources.netcat1.port = 11111
agent1.sinks.hbase.type = hbase2
agent1.sinks.hbase.table = test_table
agent1.sinks.hbase.columnFamily = test_cf
agent1.sinks.hbase.serializer = org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
agent1.channels.memory1.type = memory
agent1.channels.memory1.capacity = 1000
agent1.channels.memory1.transactionCapacity = 100
agent1.sources.netcat1.channels = memory1
agent1.sinks.hbase.channel = memory1
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
HDFSソースおよびオブジェクト・ストレージ・シンクの例
クラスタにHDFSコネクタを設定するには:
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
KafkaソースおよびHDFSシンクの例
#list hdfs sources
hdfs_agent.sources = hdfs_source
hdfs_agent.channels = memchannel
hdfs_agent.sinks = hdfs_write
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
hdfs_agent.sources.hdfs_source.kafka.bootstrap.servers = kafkassltest-mn0.bmbdcsad1.bmbdcs.oraclevcn.com:6667
hdfs_agent.sources.hdfs_source.kafka.topics = flume-kafka-test
hdfs_agent.sources.hdfs_source.batchSize = 50
hdfs_agent.sources.hdfs_source.kafka.consumer.group.id = customgid
hdfs_agent.sources.hdfs_source.kafka.consumer.security.protocol = SASL_PLAINTEXT
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.mechanism = GSSAPI
hdfs_agent.sources.hdfs_source.kafka.consumer.sasl.kerberos.service.name = kafka
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = <HDFS_PATH>
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
hdfs_agent.channels.memchannel.capacity = 100000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
複数のソース、チャネルおよびシンク構成の例
#netcat and spool directory sources configured with Hbase and HDFS sinks
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
#list hdfs sources
hdfs_agent.sources = hdfs_source spooldir-source
hdfs_agent.channels = filechannel memchannel
hdfs_agent.sinks = hdfs_write hbase
# configure hdfs-agent source
hdfs_agent.sources.hdfs_source.type = netcat
hdfs_agent.sources.hdfs_source.bind = 0.0.0.0
hdfs_agent.sources.hdfs_source.port = 33333
# configure spooldir source
hdfs_agent.sources.spooldir-source.type = spooldir
hdfs_agent.sources.spooldir-source.spoolDir = /usr/lib/flume/spooldir
hdfs_agent.sources.spooldir-source.fileHeader = false
# properties of hdfs-Cluster1-sink
hdfs_agent.sinks.hdfs_write.type = hdfs
hdfs_agent.sinks.hdfs_write.hdfs.path = oci://TRAINING@bdsdevcluster/new
hdfs_agent.sinks.hdfs_write.hdfs.roll.Interval = 30
hdfs_agent.sinks.hdfs_write.hdfs.writeFormat = Text
hdfs_agent.sinks.hdfs_write.hdfs..fileType = DataStream
# configure hbase sink
hdfs_agent.sinks.hbase.type=hbase2
hdfs_agent.sinks.hbase.table=test_table
hdfs_agent.sinks.hbase.columnFamily= test_cf
hdfs_agent.sinks.hbase.serializer=org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer
hdfs_agent.channels.filechannel.capacity = 10000
hdfs_agent.channels.filechannel.type = file
hdfs_agent.channels.filechannel.transactionCapacity = 100
hdfs_agent.channels.filechannel.checkpointDir = /usr/lib/flume/filecheckpoint
hdfs_agent.channels.filechannel.dataDirs = /usr/lib/flume/filechannel
hdfs_agent.channels.memchannel.capacity = 10000
hdfs_agent.channels.memchannel.type = memory
hdfs_agent.sources.hdfs_source.channels = memchannel
hdfs_agent.sources.spooldir-source.channels = filechannel
hdfs_agent.sinks.hdfs_write.channel = memchannel
hdfs_agent.sinks.hbase.channel = filechannel
hdfs_agent.sinks.hdfs_write.hdfs.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hdfs_write.hdfs.kerberosKeytab = <Keytab_path>
hdfs_agent.sinks.hbase.kerberosPrincipal = <User_Principal>
hdfs_agent.sinks.hbase.kerberosKeytab = <Keytab_path>