在 spark streaming 中输出数据时, 使用pool是比较合适的方法. 原因在 Design Patterns for using foreachRDD 中已经说的很详细, 主要是两点:
-
在driver上创建的connection对象传输到worker上无效;
-
rdd.foreach()或rdd.foreachPartition()中创建connection代价比较大;
而如果每个worker能够共用一个 pool, 那对资源的调配和利用都将比较好控制. spark streaming中已经给出了pool需要满足的条件:
Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems.
team需要用java实现代码, 个人对java熟悉又很有限, 并不知道怎样实现一个”lazily”的pool.
找到这篇Which is the best way to get a connection to an external database per task in Spark Streaming?, 文中有提到:
Yes, that would be the Java equivalence to use static class member, but you should carefully program to prevent resource leakage. A good choice is to use third-party DB connection library which supports connection pool, that will alleviate your programming efforts.
因此, 将pool作为一个类的static属性即可满足要求.
至于怎样实现一个pool的逻辑, 可以直接用commons pool.
这里给出一个简单的demo:
package test;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import java.sql.Connection;
public class MysqlConPool {
private KeyedObjectPool<String, Connection> pool = null;
private static MysqlConPool instance = null;
protected MysqlConPool() {
GenericKeyedObjectPoolConfig config = new GenericKeyedObjectPoolConfig();
config.setMaxTotalPerKey(1);
pool = new GenericKeyedObjectPool<String, Connection>(new MysqlConFactory(), config);
}
public static MysqlConPool getInstance() {
if (instance == null) {
synchronized (MysqlConPool.class) {
if (instance == null) {
instance = new MysqlConPool();
}
}
}
return instance;
}
}
ConFactory的实现参考commons pool的examples即可.
Commons pool本身是线程安全的, 但pool的初始化还是需要用 synchronized 来保证. 因为spark中application的executor执行tasks是多线程的.
In Spark’s execution model, each application gets its own executors, which stay up for the duration of the whole application and run 1+ tasks in multiple threads.
可参考: Integrating Kafka and Spark Streaming: Code Examples and State of the Game