root, and the password is the one you set when creating the EMR cluster. Once the correct credentials are entered, you can enter the command line interface./usr/local/service/spark:[root@172 ~]# su hadoop[root@172 root]$ cd / usr/local/service/spark
/opt directory:[hadoop@172 data]$ tar -xzvf kafka_2.10-0.10.2.0.tgz[hadoop@172 data]$ mv kafka_2.10-0.10.2.0 /opt/
telnet command to see whether the EMR cluster is connected to the CKafka instance:[hadoop@172 kafka_2.10-0.10.2.0]$ telnet $kafkaIP 9092Trying $kafkaIP...Connected to $kafkaIP.
[root@172 ~]# su hadoop[hadoop@172 root]$ cd /opt/kafka_2.10-0.10.2.0/
[hadoop@172 kafka_2.10-0.10.2.0]$ bin/kafka-console-producer.sh --broker-list $kafkaIP:9092--topic spark_streaming_testhello worldthis is a message
[hadoop@172 kafka_2.10-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server$kafkaIP:9092 --from-beginning --new-consumer --topic spark_streaming_testhello worldthis is a message
D://mavenWorkplace, by running the following commands:mvn archetype:generate -DgroupId=$yourgroupID -DartifactId=$yourartifactID-DarchetypeArtifactId=maven-archetype-quickstart
D://mavenWorkplace directory. The files included in the folder have the following structure:simple---pom.xml Core configuration, under the project root directory---src---main---java Java source code directory---resources Java configuration file directory---test---java Test source code directory---resources Test configuration directory
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.0.2</version></dependency></dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaInputDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka010.ConsumerStrategies;import org.apache.spark.streaming.kafka010.KafkaUtils;import org.apache.spark.streaming.kafka010.LocationStrategies;import scala.Tuple2;import java.util.*;import java.util.concurrent.TimeUnit;/*** Created by tencent on 2018/7/3.*/public class KafkaTest {public static void main(String[] args) throws InterruptedException {String brokers = "$kafkaIP:9092";String topics = "spark_streaming_test1"; // Subscribed topics; multiple topics should be separated by ','int durationSeconds = 60; // IntervalSparkConf conf = new SparkConf().setAppName("spark streaming word count");JavaSparkContext sc = new JavaSparkContext(conf);JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(durationSeconds));Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));// Kafka-related parameterMap<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("metadata.broker.list", brokers) ;kafkaParams.put("bootstrap.servers", brokers);kafkaParams.put("group.id", "group1");kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// Create a connectionJavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topicsSet, kafkaParams));// wordcount logicJavaPairDStream<String, Integer> counts = lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator()).mapToPair(x -> new Tuple2<String, Integer>(x, 1)).reduceByKey((x, y) -> x + y);// Save the resultcounts.dstream().saveAsTextFiles("$hdfsPath","result");//ssc.start();ssc.awaitTermination();ssc.close();}}
mvn package
scp $localfile root@public IP address:$remotefolder
D://mavenWorkplace, by running the following commands:mvn archetype:generate -DgroupId=$yourgroupID -DartifactId=$yourartifactID-DarchetypeArtifactId=maven-archetype-quickstart
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.1.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>0.10.1.0</version></dependency></dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** Created by tencent on 2018/7/4.*/public class SendData {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "$kafkaIP:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// The producer sends a messageString topic = "spark_streaming_test1";org.apache.kafka.clients.producer.Producer<String, String> procuder = new KafkaProducer<String,String>(props);while(true){int num = (int)((Math.random())*10);for (int i = 0; i <= 10; i++) {int tmp = (num+i)%10;String value = "value_" + tmp;ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);procuder.send(msg);}try {Thread.sleep(1000*10);}catch (InterruptedException e) {}}}}
mvn package
scp $localfile root@public IP address:$remotefolder
[hadoop@172 ~]$ bin/spark-submit --class KafkaTest --master yarn-cluster $consumerpackage
[hadoop@172 ~]$ yarn application –list
[hadoop@172 spark]$ bin/spark-submit --class SendData $producerpackage
[hadoop@172 root]$ hdfs dfs -ls /userFound 9 itemsdrwxr-xr-x - hadoop supergroup 0 2018-07-03 16:37 /user/hadoopdrwxr-xr-x - hadoop supergroup 0 2018-06-19 10:10 /user/hive-rw-r--r-- 3 hadoop supergroup 0 2018-06-29 10:19 /user/pythontest.txtdrwxr-xr-x - hadoop supergroup 0 2018-07-05 20:25 /user/sparkstreamingtest-1530793500000.result[hadoop@172 root]$ hdfs dfs -cat /user/sparkstreamingtest-1530793500000.result/*(value_6,16)(value_7,22)(value_8,18)(value_0,18)(value_9,17)(value_1,18)(value_2,17)(value_3,17)(value_4,16)(value_5,17)
[hadoop@172 ~]$ yarn application –kill $Application-Id
yarn application –list command.
For more information on Kafka, please see the official documentation.Esta página foi útil?
Você também pode entrar em contato com a Equipe de vendas ou Enviar um tíquete em caso de ajuda.
comentários