版本使用:flume190,kafka200,hadoop260
users.sources=usersSource users.channels=usersChannel users.sinks=usersSink //:指定source的类型为spooldir,即监控指定目录中新出现的文件,并将这些文件的内容作为事件(events)来处理。 users.sources.usersSource.type=spooldir //source监控的目录 users.sources.usersSource.spoolDir=/opt/tmp/flume/users //反序列化器为LINE,即按行读取文件内容 users.sources.usersSource.deserializer=LINE //每行的最大长度为320000个字符 users.sources.usersSource.deserializer.maxLineLength=320000 //指定只处理文件名匹配该正则表达式的文件 users.sources.usersSource.includePattern=users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv //配置了一个 users.sources.usersSource.interceptors=head_filter users.sources.usersSource.interceptors.head_filter.type=regex_filter //配置 users.sources.usersSource.interceptors.head_filter.regex=^user_id* users.sources.usersSource.interceptors.head_filter.excludeEvents=true users.channels.usersChannel.type=file //checkpoint目录,用于存储channel的元数据。 users.channels.usersChannel.checkpointDir=/opt/tmp/checkpoint/users //数据存储目录 users.channels.usersChannel.dataDirs=/opt/tmp/checkpoint/data/users //sink的类型为KafkaSink,即将事件发送到Kafka。 users.sinks.usersSink.type=org.apache.flume.sink.kafka.KafkaSink //每批次发送的事件数量为0 users.sinks.usersSink.batchSize=0 //Kafka broker的地址和端口 users.sinks.usersSink.brokerList=192.168.52.146:9092 users.sinks.usersSink.topic=users //要传输的位置,kafka的主题名 users.sources.usersSource.channels=usersChannel users.sinks.usersSink.channel=usersChannel
mkdir /opt/tmp/checkpoint/users
mkdir /opt/tmp/checkpoint/data/users
mkdir /opt/tmp/flume/users
启动Kafka
首先启动zookeeper
[root@centos143 soft]# zkServer.sh start
[root@centos143 soft]# zkServer.sh status
其次启动kafka服务
[root@centos143 soft]# kafka-server-start.sh -daemon /opt/soft/kafka200/config/server.properties
启动consumer消费者
kafka-console-consumer.sh --bootstrap-server 192.168.52.146:9092 --topic users
启动flume
./bin/flume-ng agent --name userfriends --conf ./ngcf/ --conf-file ./ngcf/userfriends.conf -Dflume.root.logger=INFO,consol
数据就导入成功啦
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- oldu.cn 版权所有 浙ICP备2024123271号-1
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务