Flume集群搭建

Flume与kafka集成

Posted by 冷小冰 on October 26, 2018

Flume集群搭建

一、软件环境

Linux: CentOS_7_5_64_org
JDK: jdk1.8.0_172
Flume: apache-flume-1.8.0
三台Linux服务器:
192.168.3.161
192.168.3.162
192.168.3.163
三台服务器上已搭建好Kafka集群

二、安装&配置

场景模拟

从三台kafka集群服务器上采集数据,并使用flume集群负载均衡的方式,将数据写入到mysql中,其结构图如下:

img

首先在三台kafka服务器上分别部署Agent,用来采集kafka所产生的数据;接着将采集到的数据,使用负载均衡的方式发送到Collector1和Collector2上,在由Collector写入到mysql中。

安装Flume

1
2
3
4
5
#目录统一放在/usr下面
#首先创建flume项目目录
mkdir flume
#下载并解压软件
tar -zxvf apache-flume-1.8.0-bin.tar.gz

配置环境变量

1
2
3
4
vim /etc/profile
#添加配置
export FLUME_HOME=/usr/flume/apache-flume-1.8.0
export PATH=$FLUME_HOME/bin:$PATH

配置flume

1
2
3
4
5
6
7
8
9
#进入conf目录,查看文件
cd /usr/flume/apache-flume-1.8.0/conf
---
flume-conf.properties.template
#启动环境配置文件
flume-env.ps1.template
flume-env.sh.template
#日志配置文件
log4j.properties

flume-env.sh.template复制一份,命名为flume-env.sh。修改如下的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.
# Enviroment variables can be set here.

#java路径
export JAVA_HOME=/usr/java/jdk1.8.0_172  
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

#内存配置
export JAVA_OPTS="-Xms1024m -Xmx1024m -Xss256m -Xmn512m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

# Let Flume write raw event data and configuration information to its log files for debugging
# purposes. Enabling these flags is not recommended in production,
# as it may result in logging sensitive user information or encryption secrets.

#日志配置
export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "
# Note that the Flume conf directory is always included in the classpath.

#flume的目录
FLUME_CLASSPATH="/usr/flume/apache-flume-1.8.0"

验证安装

1
2
3
4
5
6
7
8
9
# 查看版本信息命令
flume-ng version
---
# 打印的信息
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d

三、集群配置

配置Agent

Agent1,Agent2,Agent3,分别位于192.168.3.161;192.168.3.162;192.168.3.163三台机器,配置相同,ip需要分别配置。
/usr/flume/apache-flume-1.8.0/conf目录下新建flume-agent.properties配置文件,配置如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#name the components on this agent  这里声明各个source、channel、sink的名称  
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 k2

# Describe/configure the source    声明source的类型为kafka类型
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource 
a1.sources.r1.channels = c1 
a1.sources.r1.batchSize = 5000 
a1.sources.r1.batchDurationMillis = 2000 
#三台服务器的IP不同需要分配配置
a1.sources.r1.kafka.bootstrap.servers = 192.168.3.161:9092  
a1.sources.r1.kafka.topics = theme,my-topic,test

#描述sink 
a1.sinks.k1.type = logger
a1.sinks.k2.type = logger

#define sinkgroups   此处配置k1、k2的组策略,k1、k2合为一组,类型为均衡负载方式  
a1.sinkgroups = g1  
a1.sinkgroups.g1.sinks = k1 k2  
a1.sinkgroups.g1.processor.type = load_balance  
a1.sinkgroups.g1.processor.backoff = true  
a1.sinkgroups.g1.processor.selector = round_robin

# Use a channel which buffers events in memory   指定channel的类型为内存channel  
a1.channels.c1.type  =  memory 
a1.channels.c1.capacity  =  10000 
a1.channels.c1.transactionCapacity  =  10000 
a1.channels.c1.byteCapacityBufferPercentage  =  20 
a1.channels.c1.byteCapacity  =  800000 

# Bind the source and sink to the channel  将源和接收器绑定到通道
a1.sources.r1.channels = c1  
a1.sinks.k1.channel = c1  
a1.sinks.k2.channel=c1  

#define the sink 1     指定sink1、sink2的数据流向,都是通过avro方式发到两台collector机器  
a1.sinks.k1.type = avro  
a1.sinks.k1.hostname = 192.168.3.161  #和flume-collector.properties中source的ip相同
a1.sinks.k1.port = 5150 

#define the sink 2  
a1.sinks.k2.type = avro  
a1.sinks.k2.hostname = 192.168.3.162  #和flume-collector.properties中的ip相同
a1.sinks.k2.port = 5150  

配置 Collector

Collector1、Collector2,分别在192.168.3.161;192.168.3.162两台服务器上。配置相同,ip需要分别配置。 在/usr/flume/apache-flume-1.8.0/conf目录下新建flume-collector.properties配置文件,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#name the components on this agent  这里声明各个source、channel、sink的名称  
cr1.sources = r1  
cr1.channels = c1 
cr1.sinks = k1  

# Describe the source   声明source的类型
cr1.sources.r1.type = avro  
cr1.sources.r1.channels = c1
#和flume-agent.properties中sink的ip相同,两台的ip要分别配置
cr1.sources.r1.bind = 192.168.3.161 
cr1.sources.r1.port = 5150    

# Describe channels c1 c2 which buffers events in memory  
cr1.channels.c1.type  =  memory 
cr1.channels.c1.capacity  =  10000 
cr1.channels.c1.transactionCapacity  =  10000 
cr1.channels.c1.byteCapacityBufferPercentage  =  20 
cr1.channels.c1.byteCapacity  =  800000    

# Bind the source and sink to the channel  将源和接收器绑定到通道 
cr1.sinks.k1.channel = c1

# Describe mysqlSink 需要自定义插件
cr1.sinks.k1.type =org.custom.flume.MysqlSink   #自定义插件class的完整路径名称
cr1.sinks.k1.hostname=192.168.3.141
cr1.sinks.k1.port=3306
cr1.sinks.k1.databaseName=mysql
cr1.sinks.k1.tableName=flumeTable
cr1.sinks.k1.user=root
cr1.sinks.k1.password=root

四、MysqlSink插件

新建maven项目

导入依赖pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<project 
xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchemainstance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>org.custom</groupId>
	<artifactId>flume-mysql</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<fork>true</fork>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<properties>
		<maven.compiler.target>1.8</maven.compiler.target>
		<maven.compiler.source>1.8</maven.compiler.source>
		<version.flume>1.7.0</version.flume>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.apache.flume</groupId>
			<artifactId>flume-ng-core</artifactId>
			<version>${version.flume}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flume</groupId>
			<artifactId>flume-ng-configuration</artifactId>
			<version>${version.flume}</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>6.0.5</version>
		</dependency>
	</dependencies>
</project>

创建mysql连接类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package org.custom.flume;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.List;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;

public class MysqlSink extends AbstractSink implements Configurable {
	private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
	private String hostname;
	private String port;
	private String databaseName;
	private String tableName;
	private String user;
	private String password;
	private PreparedStatement preparedStatement;
	private Connection conn;
	private int batchSize;
	// 重试该事务次数
	private int retryCount = 5;
	private boolean transactionCompleted = false;

	public MysqlSink() {
		LOG.info("MysqlSink start...");
	}
	@Override
	public void configure(Context context) {
		hostname = context.getString("hostname");
		Preconditions.checkNotNull(hostname, "hostname must be set!!");
		port = context.getString("port");
		Preconditions.checkNotNull(port, "port must be set!!");
		databaseName = context.getString("databaseName");
		Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
		tableName = context.getString("tableName");
		Preconditions.checkNotNull(tableName, "tableName must be set!!");
		user = context.getString("user");
		Preconditions.checkNotNull(user, "user must be set!!");
		password = context.getString("password");
		Preconditions.checkNotNull(password, "password must be set!!");
		batchSize = context.getInteger("batchSize", 100);
		Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
	}
	public Status process() throws EventDeliveryException {
		Status result = Status.READY;
		Channel channel = getChannel();
		Transaction transaction = channel.getTransaction();
		Event event;
		String content;
		// 数据集合
		List<String> actions = Lists.newArrayList();
		transaction.begin();

		try {
			for (int i = 0; i < batchSize; i++) {
				event = channel.take();
				// 从通道中获取数据
				if (event != null) {
					content = new String(event.getBody());
					actions.add(content);
				} else {
					result = Status.BACKOFF;
					break;
				}
			}
			if (actions.size() > 0) {
				preparedStatement.clearBatch();
				for (String temp : actions) {
					Log.info("actions temp:{}", temp);
					// 对占位符设置值,占位符顺序从1开始,第一个参数是占位符的位置,第二个参数是占位符的值。
					preparedStatement.setString(1, temp);
					preparedStatement.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
					preparedStatement.addBatch();
				}
				preparedStatement.executeBatch();
				conn.commit();
			}
			transaction.commit();
		} catch (Throwable e) {
			try {
				transaction.rollback();
			} catch (Exception e2) {
				LOG.error("Exception in rollback.Rollback might not have been successful.", e2);
			}
			LOG.error("Failed to commit transaction.Transaction rolled back.", e);
			Throwables.propagate(e);
		} finally {
			transaction.close();
		}
		return result;
	}

	@Override
	public synchronized void start() {
		super.start();
		do {
			try {
				transactionCompleted = true;
				// 调用Class.forName()方法加载驱动程序
				Class.forName("com.mysql.jdbc.Driver");

				String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName
						+ "?autoReconnect=true&characterEncoding=UTF-8&useUnicode=true&userSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC";
				// 调用DriverManager对象的getConnection()方法,获得一个Connection对象
				conn = DriverManager.getConnection(url, user, password);
				conn.setAutoCommit(false);
				// 创建一个Statement对象
				preparedStatement = conn
						.prepareStatement("insert into " + tableName + " (content,createTime) values (?,?)");
			} catch (ClassNotFoundException e) {
				e.printStackTrace();
			} catch (SQLException sqlEx) {
				String sqlState = sqlEx.getSQLState();
				if ("08S01".equals(sqlState) || "40001".equals(sqlState)) {
                          stop();
					retryCount -= 1;
					transactionCompleted = false;
				} else {
					retryCount = 0;
				}
				sqlEx.printStackTrace();
				// System.exit(1);
			}
		} while (!transactionCompleted && (retryCount > 0));
	}
	@Override
	public synchronized void stop() {
		super.stop();
		if (preparedStatement != null) {
			try {
				preparedStatement.close();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
		if (conn != null) {
			try {
				conn.close();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
	}
}

插件的使用方式

将项目打成jar包flume-mysql-0.0.1-SNAPSHOT.jar,和mysqlmysql-connector-java-6.0.5.jar放入flume的目录下。

插件目录布局:flume安装目录下,新建plugins.d文件夹,plugins.d文件夹下在新建MysqlSink文件夹(名字可自定义),在MysqlSink文件夹在新建三个文件夹liblibextnative三个文件夹。

  • plugins.d
    • MysqlSink:
      • lib :插件的jar包 (flume-mysql-0.0.1-SNAPSHOT.jar)
      • libext :插件依赖的jar包 (mysql-connector-java-6.0.5.jar)
      • native :需要的本机库,例如.so文件 (本配置用不到)

Mysql表设计

注意:表的字段名要和插件代码中定义的相同。

1
2
3
4
5
6
CREATE TABLE flumeTable (
  ID INT (11) NOT NULL AUTO_INCREMENT,
  CONTENT VARCHAR (4000) NOT NULL,
  CREATETIME TIMESTAMP (6),
  PRIMARY KEY (ID)
) ENGINE = INNODB AUTO_INCREMENT = 4 DEFAULT CHARSET = utf8; 

启动测试

  1. 在Agent各节点上启动命令,如下所示

    1
    2
    3
    4
    
    #cd到目录
    cd /usr/flume/apache-flume-1.8.0/bin
    #启动agent命令
    nohup ./flume-ng agent --conf /usr/flume/apache-flume-1.8.0/conf --conf-file /usr/flume/apache-flume-1.8.0/conf/flume-agent.properties --name a1 &
    

    注:命令中的a1表示配置文件中的agentName,即配置文件中的a1flume-agent.properties表示配置文件,需填写准确的配置文件路径。

    启动时会报找不到ip的错误,只要把collector启动起来就好了

  2. 在Collector各节点上启动命令,如下所示:

    1
    
    nohup ./flume-ng agent --conf /usr/flume/apache-flume-1.8.0/conf --conf-file /usr/flume/apache-flume-1.8.0/conf/flume-collector.properties --name cr1 &
    

    注:命令中的cr1表示配置文件中的collectorName,即配置文件中的cr1flume-collector.properties表示配置文件,需填写准确的配置文件路径。

  3. 向kakfa中写入数据,查看mysql表中是否存在数据

    img

    Flume安装测试成功。