View Javadoc
1   package guru.mikelue.jdut.decorate;
2   
3   import java.sql.Connection;
4   import java.sql.DatabaseMetaData;
5   import java.sql.JDBCType;
6   import java.sql.ResultSet;
7   import java.sql.SQLException;
8   import java.util.ArrayList;
9   import java.util.Collections;
10  import java.util.HashMap;
11  import java.util.List;
12  import java.util.Map;
13  import java.util.Optional;
14  import java.util.concurrent.ConcurrentHashMap;
15  
16  import javax.sql.DataSource;
17  
18  import org.slf4j.Logger;
19  import org.slf4j.LoggerFactory;
20  
21  import guru.mikelue.jdut.datagrain.DataRow;
22  import guru.mikelue.jdut.datagrain.DataRowException;
23  import guru.mikelue.jdut.datagrain.SchemaColumn;
24  import guru.mikelue.jdut.datagrain.SchemaTable;
25  import guru.mikelue.jdut.jdbc.JdbcSupplier;
26  import guru.mikelue.jdut.jdbc.JdbcTemplateFactory;
27  import guru.mikelue.jdut.jdbc.JdbcVoidFunction;
28  import guru.mikelue.jdut.jdbc.SQLExceptionConvert;
29  import guru.mikelue.jdut.jdbc.util.MetaDataWorker;
30  
31  /**
32   * Loads database schema and validating rows.<br>
33   *
34   * To improve performance, this object would cache {@link SchemaTable} by its name,
35   * but this class is thread-safe for the caching mechanism.
36   */
37  public class TableSchemaLoadingDecorator implements DataGrainDecorator {
38  	private final static Logger logger = LoggerFactory.getLogger(TableSchemaLoadingDecorator.class);
39  
40  	private final DataSource dataSource;
41  	private Map<String, SchemaTable> cachedTables = new ConcurrentHashMap<>(32);
42  
43  	public TableSchemaLoadingDecorator(DataSource newDataSource)
44  	{
45  		dataSource = newDataSource;
46  	}
47  
48  	/**
49  	 * {@inheritDoc}
50  	 */
51  	@Override
52  	public void decorate(DataRow.Builder rowBuilder)
53  	{
54  		if (rowBuilder.getValidated()) {
55  			return;
56  		}
57  
58  		/**
59  		 * Loads scehma from cache or JDBC meta-data
60  		 */
61  		SchemaTable table = rowBuilder.getTable();
62  		String tableIdentifier = table.getFullTableName();
63  		logger.debug("Decorate table for loading schema: [{}]", tableIdentifier);
64  
65  		if (!cachedTables.containsKey(tableIdentifier)) {
66  			logger.debug("First time of loading schema");
67  			SchemaTable newTableSchema = loadSchema(table);
68  
69  			cachedTables.put(tableIdentifier, newTableSchema);
70  		}
71  		// :~)
72  
73  		rowBuilder.tableSchema(cachedTables.get(tableIdentifier));
74  
75  		/**
76  		 * Validates the row data if it follow the definitions of schema.
77  		 */
78  		try {
79  			rowBuilder.validate();
80  		} catch (DataRowException e) {
81  			logger.error("Validation of row[{}] has error", rowBuilder.getTable());
82  			throw new RuntimeException(e);
83  		}
84  		// :~)
85  	}
86  
87  	private SchemaTable../../guru/mikelue/jdut/datagrain/SchemaTable.html#SchemaTable">SchemaTable loadSchema(SchemaTable source)
88  	{
89  		JdbcSupplier<SchemaTable> funcForLoadingSchema = JdbcTemplateFactory.buildSupplier(
90  			() -> dataSource.getConnection(),
91  			conn -> loadSchema(conn, source)
92  		);
93  
94  		try {
95  			return funcForLoadingSchema.getJdbc();
96  		} catch (SQLException e) {
97  			logger.error("SQL error while loading schema of table: {}", source.toString());
98  			throw SQLExceptionConvert.runtimeException(e);
99  		}
100 	}
101 
102 	private SchemaTablee/jdut/datagrain/SchemaTable.html#SchemaTable">SchemaTable loadSchema(Connection conn, SchemaTable sourceTable)
103 		throws SQLException
104 	{
105 		DatabaseMetaData metaData = conn.getMetaData();
106 		MetaDataWorker metaDataWorker = new MetaDataWorker(metaData);
107 		SchemaAndTableName cananicalName = processSchemaAndTableName(metaDataWorker, sourceTable);
108 
109 		logger.debug("Load schema for: {}", cananicalName);
110 
111 		/**
112 		 * In order to respect the case-sensitive of identifiers,
113 		 * this new table doesn't clone from old table schema
114 		 */
115 		return SchemaTable.build(tableBuilder -> {
116 			tableBuilder.metaDataWorker(metaDataWorker);
117 			tableBuilder.schema(cananicalName.schema);
118 			tableBuilder.name(cananicalName.table);
119 			tableBuilder.keys(sourceTable.getKeys().toArray(new String[0]));
120 
121 			Map<String, SchemaColumn> loadedColumns = loadColumns(cananicalName, metaData);
122 			for (SchemaColumn column: loadedColumns.values()) {
123 				tableBuilder.column(column);
124 			}
125 
126 			/**
127 			 * Loads keys if there is no set one
128 			 */
129 			if (sourceTable.getKeys().isEmpty()) {
130 				logger.debug("Fetch keys for table: \"{}\"", sourceTable.getName());
131 
132 				String[] keys = loadKeys(loadedColumns, metaData, cananicalName);
133 				tableBuilder.keys(keys);
134 			}
135 			// :~)
136 		});
137 		// :~)
138 	}
139 
140 	private SchemaAndTableName processSchemaAndTableName(MetaDataWorker metaDataWorker, SchemaTable sourceTable)
141 		throws SQLException
142 	{
143 		String catalog = sourceTable.getCatalog()
144 			.map(metaDataWorker::processIdentifier)
145 			.orElse(null);
146 		String sourceTableName = metaDataWorker.processIdentifier(
147 			sourceTable.getName()
148 		);
149 
150 		if (
151 			!metaDataWorker.supportsSchemasInTableDefinitions() ||
152 			!sourceTableName.contains(".") ||
153 			sourceTable.getSchema().isPresent()
154 		) {
155 			return new SchemaAndTableName(
156 				catalog,
157 				metaDataWorker.processIdentifier(sourceTable.getSchema().orElse(null)),
158 				sourceTableName
159 			);
160 		}
161 
162 		String[] schemaAndTableName = sourceTableName.split("\\.");
163 		if (schemaAndTableName.length > 2) {
164 			throw new RuntimeException(
165 				String.format("Cannot recgonize schema and table name: \"%s\"", sourceTableName)
166 			);
167 		}
168 
169 		return new SchemaAndTableName(
170 			catalog,
171 			metaDataWorker.processIdentifier(schemaAndTableName[0]),
172 			metaDataWorker.processIdentifier(schemaAndTableName[1])
173 		);
174 	}
175 
176 	private Map<String, SchemaColumn> loadColumns(
177 		SchemaAndTableName cananicalName,
178 		DatabaseMetaData metaData
179 	) {
180 		logger.debug("Load columns for: {}", cananicalName);
181 
182 		JdbcSupplier<Map<String, SchemaColumn>> jdbcGetColumns = JdbcTemplateFactory.buildSupplier(
183 			() -> metaData.getColumns(
184 				cananicalName.catalog,
185 				cananicalName.schema, cananicalName.table,
186 				null
187 			),
188 			/**
189 			 * Builds information of columns
190 			 */
191 			(ResultSet rsColumns) -> {
192 				Map<String, SchemaColumn> columns = new HashMap<>();
193 
194 				while (rsColumns.next()) {
195 					String columnName = rsColumns.getString("COLUMN_NAME");
196 					JDBCType jdbcType = JDBCType.valueOf(rsColumns.getInt("DATA_TYPE"));
197 
198 					logger.debug("Loading meta-data of columns: \"{}\". Type: [{}]",
199 						columnName, jdbcType
200 					);
201 
202 					JdbcVoidFunction<SchemaColumn.Builder> columnBuilder = builder -> {
203 						builder
204 							.name(columnName)
205 							.jdbcType(jdbcType)
206 							.defaultValue(rsColumns.getString("COLUMN_DEF"));
207 
208 						try {
209 							String autoIncremental = rsColumns.getString("IS_AUTOINCREMENT");
210 							if (autoIncremental != null) {
211 								builder.autoIncremental("YES".equals(autoIncremental) ? true : false);
212 							}
213 						} catch (SQLException e) {
214 							logger.info("This database doesn't have \"IS_AUTOINCREMENT\" meta data of JDBC");
215 						}
216 
217 						switch (rsColumns.getInt("NULLABLE")) {
218 							case DatabaseMetaData.columnNullable:
219 								builder.nullable(true);
220 								break;
221 							case DatabaseMetaData.columnNoNulls:
222 								builder.nullable(false);
223 								break;
224 						}
225 					};
226 
227 					SchemaColumn loadedColumn =
228 						SchemaColumn.build(columnBuilder.asConsumer());
229 					columns.put(loadedColumn.getName(), loadedColumn);
230 				}
231 				// :~)
232 
233 				return columns;
234 			}
235 		);
236 
237 		return jdbcGetColumns.asSupplier().get();
238 	}
239 
240 	private String[] loadKeys(
241 		Map<String, SchemaColumn> columnsInfo,
242 		DatabaseMetaData metaData,
243 		SchemaAndTableName cananicalName
244 	) {
245 		logger.debug("Load keys for: {}", cananicalName);
246 
247 		List<String> keys = null;
248 
249 		/**
250 		 * Loads keys by DatabaseMetaData#getPrimaryKeys
251 		 */
252 		JdbcSupplier<List<String>> loadKeysByPk = JdbcTemplateFactory.buildSupplier(
253 			() -> metaData.getPrimaryKeys(
254 				cananicalName.catalog,
255 				cananicalName.schema,
256 				cananicalName.table
257 			),
258 			rs -> {
259 				List<String> loadedKeys = new ArrayList<>(4);
260 
261 				while (rs.next()) {
262 					String pkName = rs.getString("COLUMN_NAME");
263 
264 					logger.debug("Fetch key by primary key: \"{}\"", pkName);
265 					loadedKeys.add(pkName);
266 				}
267 
268 				return loadedKeys;
269 			}
270 		);
271 		keys = loadKeysByPk.asSupplier().get();
272 		if (!keys.isEmpty()) {
273 			return keys.toArray(new String[0]);
274 		}
275 		// :~)
276 
277 		logger.debug("Deduce keys for: {}", cananicalName);
278 		String[] deducedKeys = deduceKeys(
279 			columnsInfo,
280 			() -> metaData.getIndexInfo(
281 				cananicalName.catalog,
282 				cananicalName.schema,
283 				cananicalName.table,
284 				true, true
285 			)
286 		).toArray(new String[0]);
287 
288 		return deducedKeys;
289 	}
290 
291 	private List<String> deduceKeys(
292 		Map<String, SchemaColumn> columnsInfo,
293 		JdbcSupplier<ResultSet> rsUniqueIndexSupplier
294 	) {
295 		JdbcSupplier<List<String>> supplier = JdbcTemplateFactory.buildSupplier(
296 			rsUniqueIndexSupplier,
297 			rs -> {
298 				Map<String, List<String>> indexes = new HashMap<>(4);
299 				Map<String, Integer> numberOfNullableColumns = new HashMap<>(4);
300 
301 				while (rs.next()) {
302 					String columnName = rs.getString("COLUMN_NAME");
303 					if (columnName == null) {
304 						continue;
305 					}
306 
307 					String indexName = rs.getString("INDEX_NAME");
308 					SchemaColumn schemaColumn = columnsInfo.get(columnName);
309 
310 					logger.debug(
311 						"Collected information of unique index: \"{}\" and column: \"{}\". Nullable: [{}]",
312 						indexName, columnName, schemaColumn.getNullable().get()
313 					);
314 
315 					/**
316 					 * Puts index information into map
317 					 */
318 					if (!indexes.containsKey(indexName)) {
319 						indexes.put(indexName, new ArrayList<>(4));
320 						numberOfNullableColumns.put(indexName, 0);
321 					}
322 
323 					indexes.get(indexName).add(columnName);
324 					numberOfNullableColumns.put(
325 						indexName,
326 						numberOfNullableColumns.get(indexName) +
327 						( schemaColumn.getNullable().get() ?  1 : 0 )
328 					);
329 					// :~)
330 				}
331 
332 				/**
333 				 * Fetch the least columns with non-null value
334 				 */
335 				Optional<Map.Entry<String, Integer>> notNullIndexWithLeastColumns = numberOfNullableColumns.entrySet().stream()
336 					.filter(
337 						// Without nullable column
338 						indexEntry -> indexEntry.getValue() == 0
339 					)
340 					.min((entryLeft, entryRight) ->
341 						Integer.compare(
342 							indexes.get(entryLeft.getKey()).size(),
343 							indexes.get(entryRight.getKey()).size()
344 						)
345 					);
346 
347 				if (notNullIndexWithLeastColumns.isPresent()) {
348 					List<String> nonNullKeys = indexes.get(
349 						notNullIndexWithLeastColumns.get().getKey()
350 					);
351 					logger.debug("Got non null keys: {}", nonNullKeys);
352 					return nonNullKeys;
353 				}
354 				// :~)
355 
356 				/**
357 				 * Fetch the least nullable value columns of index
358 				 */
359 				Optional<Map.Entry<String, Integer>> nullableIndexWithLeastColumns = numberOfNullableColumns.entrySet().stream()
360 					.min((entryLeft, entryRight) ->
361 						entryLeft.getValue().compareTo(
362 							entryRight.getValue()
363 						)
364 					);
365 
366 				if (nullableIndexWithLeastColumns.isPresent()) {
367 					List<String> nullableKeys = indexes.get(
368 						nullableIndexWithLeastColumns.get().getKey()
369 					);
370 					logger.debug("Got nullable keys: {}", nullableKeys);
371 					return nullableKeys;
372 				}
373 				// :~)
374 
375 				return Collections.<String>emptyList();
376 			}
377 		);
378 
379 		return supplier.asSupplier().get();
380 	}
381 }
382 
383 class SchemaAndTableName {
384 	String catalog;
385 	String schema;
386 	String table;
387 
388 	SchemaAndTableName(String newCatalog, String newSchema, String newTable)
389 	{
390 		catalog = newCatalog;
391 		schema = newSchema;
392 		table = newTable;
393 	}
394 
395 	@Override
396 	public String toString()
397 	{
398 		return String.format(
399 			"Cananical name(<catalog>.<schema>.<table>): [%s.%s.%s]",
400 			catalog == null ? "<null>" : catalog,
401 			schema == null ? "<null>" : schema,
402 			table
403 		);
404 	}
405 }