kafka将消息从s3连接器连接到aws

zbdgwd5y  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(289)

我试图让Kafka连接多个消息到s3桶接收器。到目前为止,当我生成多行{sessionid:1,userid:1,timestamp:2011-10-10}{sessionid:2,userid:2,timestamp:2011-10-10}
当我下载json文件时,awss3 bucket只显示第一行。它说“无法扩展文件”,当我点击查看文件时,它只显示生成的第一行消息。
这是我的文件的一部分 python3-pip RUN pip3 install awscli deployment.yaml文件

command:
  -bash
  - -c
  - |
   aws configure set aws_access_key_id "$aws_access_key" &
   aws configure set aws_secret_key "$aws_secret_key" &
   aws configure set default.region us-west-2 &
   /etc/confluent/docker/run &
   bin/connect-distributed.sh config/worker.properties
   echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
    while [ $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) -eq 000 ] ; do 
       echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) " (waiting for 200)"
    sleep 5 
  done
  nc -vz kafka-connect 8083
  echo -e "\n--\n+> Creating Kafka Connector(s)"
  /scripts/create-connectors.sh  # Note: This script is stored externally from container
  bash properties.file
  sleep infinity

属性文件:

{

“name”:“meetups-to-s3”,
“配置”:{

"_comment": "The S3 sink connector class",                                                                                                                                                                                                                      
"connector.class":"io.confluent.connect.s3.S3SinkConnector",                                                                                                                                                                                                    

"_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",                                                                                                                                            
"tasks.max":"1",                                                                                                                                                                                                                                                

"_comment": "Which topics to export to S3",                                                                                                                                                                                                                     
"topics":"meetups",                                                                                                                                                                                                                                             

"_comment": "The S3 bucket that will be used by this connector instance",                                                                                                                                                                                       
"s3.bucket.name":"meetups",                                                                                                                                                                                                                               

"_comment": "The AWS region where the S3 bucket is located",                                                                                                                                                                                                    
"s3.region":"us-west-2",                                                                                                                                                                                                                                        

"_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",                                             
"s3.part.size":"5242880",                                                                                                                                                                                                                                       

"_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",                                                                                                                                                                              
"flush.size":"100000",                                                                                                                                                                                                                                          

"_comment": "Kafka Connect converter used to deserialize keys (unused in this example)",                                                                                                                                                                        
"key.converter":"org.apache.kafka.connect.json.JsonConverter",                                                                                                                                                                                                  
"key.converter.schemas.enable":"false",                                                                                                                                                                                                                         

"_comment": "Kafka Connect converter used to deserialize values",                                                                                                                                                                                               
"value.converter":"org.apache.kafka.connect.json.JsonConverter",                                                                                                                                                                                                
"value.converter.schemas.enable":"false",                                                                                                                                                                                                                       

"_comment": "The type of storage for this storage cloud connector",                                                                                                                                                                                             
"storage.class":"io.confluent.connect.s3.storage.S3Storage",                                                                                                                                                                                                    

"_comment": "The storage format of the objects uploaded to S3",                                                                                                                                                                                                 
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",                                                                                                                                                                                                

"_comment": "Schema compatibility mode between records with schemas (Useful when used with schema-based converters. Unused in this example, listed for completeness)",                                                                                          
"schema.compatibility":"NONE",                                                                                                                                                                                                                                  

"_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",                                                                                                                                                  
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",                                                                                                                                                                            

"_comment": "The locale used by the time-based partitioner to encode the date string",                                                                                                                                                                          
"locale":"en",

"_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",                                                                                                                                                            
"timezone":"UTC",                                                                                                                                                                                                                                               

"_comment": "The date-based part of the S3 object key",                                                                                                                                                                                                         
"path.format":"'date'=YYYY-MM-dd/'hour'=HH",                                                                                                                                                                                                                    

"_comment": "The duration that aligns with the path format defined above",                                                                                                                                                                                      
"partition.duration.ms":"3600000",                                                                                                                                                                                                                              

"_comment": "The interval between timestamps that is sufficient to upload a new object to S3. Here a small interval of 1min for better visualization during the demo",                                                                                          
"rotate.interval.ms":"60000",                                                                                                                                                                                                                                   

"_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",                                                                                                                                                  
"timestamp.extractor":"Record"

}
}
所以我的question:is there kafka connect将多条消息生成到一个文件中的一种方法,因为一个主题可以有多条消息,就像multipart一样?部署文件或属性文件中是否缺少任何内容?谢谢,有问题请评论

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题