001 /*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016 package org.opengion.fukurou.process;
017
018 import org.opengion.fukurou.util.Argument;
019 import org.opengion.fukurou.util.SystemParameter;
020 import org.opengion.fukurou.util.LogWriter;
021
022 import org.opengion.fukurou.util.HybsEntry ;
023 import org.opengion.fukurou.util.Closer;
024 import org.opengion.fukurou.db.ConnectionFactory;
025
026 import java.util.Set ;
027 import java.util.HashSet ;
028 import java.util.Map ;
029 import java.util.LinkedHashMap ;
030
031 import java.sql.Connection;
032 import java.sql.Statement;
033 import java.sql.ResultSet;
034 import java.sql.SQLException;
035
036 /**
037 * Process_BulkQueryは、データベ?スから読み取った?容を??処?るために?
038 * ParamProcess のサブクラス(Process_DBParam)にセ?したり??したりす?
039 * FirstProcess と、ChainProcess のインターフェースを両方持った?実?ラスです?
040 *
041 * こ?クラスは、上流から?下流への処???度しか実行されません?
042 * FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します?
043 * ChainProcess は、その結果を取り?し?自??身の処?果と合せて?します?
044 *
045 * FirstProcess では?action は、query のみです?
046 * query は、指定?SQL?実行し、結果のSetをParamProcessに設定します?
047 * ChainProcess では?action は、query、bulkSet、minus、intersect が指定できます?
048 * query は、上記と同じです?
049 * minus は、?のSetから、SQL??実行結果を引き算し、結果Setを?設定します?
050 * intersect は、?のSetから、SQL??実行結果と重?る結果Setを?設定します?
051 * bulkSet は、?のSetを取り?し?SQL??して処?ます?
052 * 流れ?は、query で検索し?minusまた?intersect でSetオブジェクトを?し?bulkSet で
053 * 利用します?例えば、ORACLEから、ユニ?クキーのSetを作?し?SQLServerのユニ?クキー?
054 * minusした結果を?ORACLEからDELETEすれば、不要な??タを削除するなどの処?実行可能になります?
055 * また?単純に、query ?を?チェインすれば、単発のUPDATE?実行することが可能です?
056 *
057 * ??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に
058 * 設定された接?Connection)を使用します?
059 * DBID は、Process_DBParam の -configFile で?す?DBConfig.xml ファイルを使用します?
060 *
061 * 引数??中にスペ?スを含??合?、ダブルコー??ション("") で括って下さ??
062 * 引数??の ?』?前後には、スペ?スは挟めません。??key=value の様に
063 * 繋げてください?
064 *
065 * SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?
066 *
067 * @og.formSample
068 * Process_BulkQuery -action=query -dbid=DBGE -sql="select KEY from TABLE_X"
069 *
070 * -action=処????) ??実行する??法を?しま?
071 * -action=query 単なるSQL?実行します?
072 * -action=bulkSet 実行したSQL??結果を?Set<String> オブジェクトに設定します?
073 * -action=minus Set<String> オブジェクトと、ここでの実行結果の差?とります?
074 * -action=intersect Set<String> オブジェクトと、ここでの実行結果の積?をとります?
075 * [ -dbid=DB接続ID ] ??-dbid=DBGE (? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規?
076 * [ -sql=検索SQL? ] ??-sql="select * from GEA08"
077 * [ -sqlFile=検索SQLファイル ] ??-sqlFile=select.sql
078 * -sql= を指定しな??合?、ファイルで??してください?
079 * [ -sql_XXXX=固定? ] ??-sql_SYSTEM_ID=GE
080 * SQL?の{@XXXX}??を指定?固定?で置き換えます?
081 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'
082 * [ -bulkKey=XXXX ] ??-bulkKey=XXXX
083 * SQL?の{@XXXX}??をProcess_BulkQuery等で取得した?で置き換えます?
084 * WHERE SYSTEM_ID IN ( {@XXXX} ) ?WHERE SYSTEM_ID IN ( 'AA','BB','CC' )
085 * [ -bulkType=NUM|STR ] ??-bulType=STR
086 * Bulkの値を文字?に変換する場合に、数字型か??型を指定します?
087 * 数字型では、AA,BB,CC とし??型では?AA','BB','CC' に変換しま?初期値:STR)?
088 * [ -fetchSize=100 ] ?フェ?する行数(初期値:100)
089 * [ -display=false|true ] ?結果を標準?力に表示する(true)かしな?false)?初期値:false[表示しない])
090 * [ -debug=false|true ] ?デバッグ??を標準?力に表示する(true)かしな?false)?初期値:false[表示しない])
091 *
092 * @og.rev 5.3.4.0 (2011/04/01) 新規追?
093 * @version 4.0
094 * @author Kazuhiko Hasegawa
095 * @since JDK5.0,
096 */
097 public class Process_BulkQuery extends AbstractProcess implements FirstProcess , ChainProcess {
098 private static final int MAX_BULK_SET = 500 ; // ORACLE の制? 1000 なので?
099
100 private static final String ACT_QUERY = "query" ;
101 private static final String ACT_BULKSET = "bulkSet" ;
102 private static final String ACT_MINUS = "minus" ;
103 private static final String ACT_INTERSECT = "intersect" ;
104
105 private static final String[] ACTION_LST = new String[] { ACT_QUERY,ACT_BULKSET,ACT_MINUS,ACT_INTERSECT };
106
107 // private LineModel newData = null;
108
109 private String actionCmd = null; // SQL結果を加工(query:実行?minus:引き算?intersect:重??)
110 private String dbid = null; // メインDB接続ID
111
112 private String bulkKey = null;
113 private boolean bulkType = true; // true:STR , false:NUM
114
115 private int sqlCount = 0; // SQL??処?数
116 private int setCount = 0; // 取り出したSetの件数
117 private int outCount = 0; // マ?ジ後?Setの件数
118
119 private int fetchSize = 100;
120 private boolean display = false; // 表示しな?
121 private boolean debug = false; // ????
122 private boolean firstTime = true; // ??の?目
123
124 private static final Map<String,String> mustProparty ; // ?プロパティ???チェ?用 Map
125 private static final Map<String,String> usableProparty ; // ?プロパティ?整合?チェ? Map
126
127 static {
128 mustProparty = new LinkedHashMap<String,String>();
129 mustProparty.put( "action", "実行する??法を?します?(query|minus|intersect)" );
130
131 usableProparty = new LinkedHashMap<String,String>();
132 usableProparty.put( "dbid", "Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? );
133 usableProparty.put( "sql", "検索SQL?sql or sqlFile ??)? \"select * from GEA08\"" );
134 usableProparty.put( "sqlFile", "検索SQLファイル(sql or sqlFile ??)? select.sql" );
135 usableProparty.put( "sql_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" +
136 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" );
137 usableProparty.put( "dbid2", "DB接続ID2 ? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? );
138 usableProparty.put( "sql2", "検索SQL?(sql or sqlFile ??)? \"select * from GEA08\"" );
139 usableProparty.put( "sqlFile2", "検索SQLファイル2(sql or sqlFile ??)? select.sql" );
140 usableProparty.put( "sql2_", "SQL?中の{@XXXX}??を指定?固定?で置き換えます?" +
141 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" );
142 usableProparty.put( "bulkKey", "SQL?の{@XXXX}??をProcess_BulkQuery等で取得した?で置き換えます?" +
143 CR + "WHERE SYSTEM_ID IN ( {@XXXX} ) ?WHERE SYSTEM_ID IN ( 'AA','BB','CC' )" );
144 usableProparty.put( "bulkType", "Bulkの値を文字?に変換する場合に、文字型か?数字型を指定します?" +
145 CR + "数字型では、AA,BB,CC とし??型では?AA','BB','CC' に変換します?(初期値:STR)" );
146 usableProparty.put( "fetchSize","フェ?する行数 (初期値:100)" );
147 usableProparty.put( "display", "結果を標準?力に表示する(true)かしな?false)? +
148 CR + "(初期値:false:表示しな?" );
149 usableProparty.put( "debug", "????を標準?力に表示する(true)かしな?false)? +
150 CR + "(初期値:false:表示しな?" );
151 }
152
153 /**
154 * ?ォルトコンストラクター?
155 * こ?クラスは、動??されます??ォルトコンストラクターで?
156 * super クラスに対して、?な初期化を行っておきます?
157 *
158 */
159 public Process_BulkQuery() {
160 super( "org.opengion.fukurou.process.Process_BulkQuery",mustProparty,usableProparty );
161 }
162
163 /**
164 * プロセスの初期化を行います?初めに??、呼び出されます?
165 * 初期処?ファイルオープン??オープン?に使用します?
166 *
167 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追?
168 *
169 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク?
170 */
171 public void init( final ParamProcess paramProcess ) {
172 Argument arg = getArgument();
173
174 actionCmd = arg.getProparty("action" , null , ACTION_LST );
175
176 fetchSize = arg.getProparty("fetchSize",fetchSize);
177 display = arg.getProparty("display",display);
178 debug = arg.getProparty("debug",debug);
179
180 dbid = arg.getProparty("dbid");
181 String sql = arg.getFileProparty("sql","sqlFile",true);
182 if( debug ) { println( "入力SQL:" + sql ); }
183
184 HybsEntry[] entry =arg.getEntrys( "sql_" ); //配?
185 SystemParameter sysParam = new SystemParameter( sql );
186 sql = sysParam.replace( entry );
187 if( debug ) { println( "変換SQL:" + sql ); }
188
189 if( ACT_BULKSET.equalsIgnoreCase( actionCmd ) ) {
190 bulkKey = arg.getProparty("bulkKey");
191 String bkType = arg.getProparty("bulkType");
192 if( bkType != null ) { bulkType = bkType.equalsIgnoreCase( "STR" ); }
193
194 Set<String> setData = paramProcess.getBulkData();
195 if( debug ) { println( setData.toString() ); }
196 setCount = setData.size();
197
198 if( setCount > 0 ) {
199 // 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追?
200 // sql = makeBulkQuery( sql,bulkKey,bulkType,setData );
201 // if( debug ) { println( "BulkSQL:" + sql ); }
202 // createSetData( paramProcess, dbid, sql );
203 String[] sqls = makeBulkQuery( sql,bulkKey,bulkType,setData );
204 for( int i=0; i<sqls.length; i++ ) {
205 if( debug ) { println( "BulkSQL:" + sqls[i] ); }
206 createSetData( paramProcess, dbid, sqls[i] );
207 }
208 }
209 }
210 else if( ACT_QUERY.equalsIgnoreCase( actionCmd ) ) {
211 Set<String> setData2 = createSetData( paramProcess, dbid, sql );
212 if( debug ) { println( setData2.toString() ); }
213 setCount = setData2.size();
214 outCount = setCount;
215 paramProcess.setBulkData( setData2 );
216 }
217 else {
218 Set<String> setData = paramProcess.getBulkData();
219 Set<String> setData2 = createSetData( paramProcess, dbid, sql );
220 setCount = setData2.size();
221
222 if( ACT_MINUS.equalsIgnoreCase( actionCmd ) ) {
223 setData.removeAll( setData2 );
224 }
225 else if( ACT_INTERSECT.equalsIgnoreCase( actionCmd ) ) {
226 setData.retainAll( setData2 );
227 }
228 outCount = setData.size();
229 if( debug ) { println( setData.toString() ); }
230 paramProcess.setBulkData( setData );
231 }
232 }
233
234 /**
235 * プロセスの終?行います??に??、呼び出されます?
236 * 終???ファイルクローズ??クローズ?に使用します?
237 *
238 * @param isOK ト?タルで、OK?たかど? [true:成功/false:失敗]
239 */
240 public void end( final boolean isOK ) {
241 // 何もありません?
242 }
243
244 /**
245 * こ???タの処?おいて、次の処?出来るかど?を問?わせます?
246 * こ?呼び出し1回毎に、次の??タを取得する準備を行います?
247 *
248 * @return 処?きる:true / 処?きな?false
249 */
250 public boolean next() {
251 return firstTime;
252 }
253
254 /**
255 * 引数の LineModel を??るメソ?です?
256 * 変換処?? LineModel を返します?
257 * 後続??行わな?????タのフィルタリングを行う場?は?
258 * null ??タを返します?つまり?null ??タは、後続??行わな?
259 * フラグの代わりにも使用して?す?
260 * なお?変換処?? LineModel と、オリジナルの LineModel が?
261 * 同?、コピ?(クローン)か?、各処?ソ??決めて?す?
262 * ドキュメントに明記されて???合?、副作用が問題になる?合??
263 * ???とに自?コピ?(クローン)して下さ??
264 *
265 * @param data オリジナルのLineModel
266 *
267 * @return 処?換後?LineModel
268 */
269 @SuppressWarnings(value={"unchecked"})
270 public LineModel action( final LineModel data ) {
271 return data ;
272 }
273
274 /**
275 * ??に?行データである LineModel を作?しま?
276 * FirstProcess は、次?処?チェインして???の行データ?
277 * 作?して、後続? ChainProcess クラスに処?ータを渡します?
278 *
279 * @param rowNo 処?の行番号
280 *
281 * @return 処?換後?LineModel
282 */
283 public LineModel makeLineModel( final int rowNo ) {
284 firstTime = false; // ?しか処?な?め?false を設定する?
285
286 LineModel model = new LineModel();
287
288 model.setRowNo( rowNo );
289
290 return model;
291 }
292
293 /**
294 * ?で使用する Set オブジェクトを作?します?
295 * Exception 以外では、? Set<String> オブジェクトを返します?
296 *
297 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追?
298 *
299 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク?
300 * @param dbid 接続?ID
301 * @param sql 実行するSQL?検索系)
302 *
303 * @return 実行結果から取り出した、最初?カラ??みを集めた Setオブジェク?
304 * @throws RuntimeException ??タベ?ス処?できなかった?合?
305 */
306 private Set<String> createSetData( final ParamProcess paramProcess, final String dbid, final String sql ) {
307 Set<String> data = new HashSet<String>();
308
309 Connection connection = null;
310 Statement stmt = null;
311 ResultSet resultSet = null;
312
313 try {
314 connection = paramProcess.getConnection( dbid );
315 stmt = connection.createStatement();
316 if( fetchSize > 0 ) { stmt.setFetchSize( fetchSize ); }
317 if( stmt.execute( sql ) ) { // true:検索系 , false:更新系
318 resultSet = stmt.getResultSet();
319 while( resultSet.next() ) {
320 sqlCount++ ;
321 String str = resultSet.getString(1);
322 if( display ) { println( str ); }
323 data.add( str );
324 }
325 }
326 else {
327 // sqlCount = stmt.getUpdateCount(); // 5.3.9.0 (2011/09/01)
328 sqlCount += stmt.getUpdateCount();
329 }
330 }
331 catch (SQLException ex) {
332 String errMsg = "SQL を実行できませんでした? + CR
333 + "DBID=" + dbid + CR
334 + "SQL =" + sql ;
335 throw new RuntimeException( errMsg,ex );
336 }
337 finally {
338 Closer.resultClose( resultSet );
339 Closer.stmtClose( stmt );
340
341 ConnectionFactory.remove( connection,dbid );
342 }
343 return data;
344 }
345
346 /**
347 * ?で使用する Set オブジェクトを作?します?
348 * Exception 以外では、? Set<String[]> オブジェクトを返します?
349 *
350 * @og.rev 5.3.9.0 (2011/09/01) 1000件を?た?合?処?追?
351 *
352 * @param sql オリジナルのSQL?
353 * @param bulkKey ?処?置き換えるキー??
354 * @param bulkType ?型(true)か?数字型(false)を指?
355 * @param setData ?処???なるSetオブジェク?
356 *
357 * @return オリジナルのSQL?に ?処????と置換したSQL??配?
358 */
359 private String[] makeBulkQuery( final String sql, final String bulkKey, final boolean bulkType,final Set<String> setData ) {
360 String[] sqls = new String[ (setData.size()/MAX_BULK_SET) + 1 ];
361 int idx = 0;
362 int cnt = 0;
363
364 StringBuilder buf = new StringBuilder();
365 String bulkVal = null;
366 if( bulkType ) { // ??の場?
367 for( String key : setData ) {
368 cnt++;
369 buf.append( ",'" ).append( key ).append( "'" );
370 if( cnt >= MAX_BULK_SET ) {
371 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
372 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
373 cnt = 0;
374 buf = new StringBuilder();
375 }
376 }
377 if( cnt > 0 ) { // きっちりで終わらな???
378 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
379 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
380 }
381 }
382 else { // 数字?場?
383 for( String key : setData ) {
384 cnt++;
385 buf.append( "," ).append( key );
386 if( cnt >= MAX_BULK_SET ) {
387 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
388 sqls[idx++] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
389 cnt = 0;
390 buf = new StringBuilder();
391 }
392 }
393 if( cnt > 0 ) { // きっちりで終わらな???
394 bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
395 sqls[idx] = sql.replace( "{@" + bulkKey + "}" ,bulkVal );
396 }
397 }
398 // String bulkVal = buf.substring( 1 ); // 先?のコロンを?ずす
399
400 // return sql.replace( "{@" + bulkKey + "}" ,bulkVal );
401 return sqls;
402 }
403
404 /**
405 * プロセスの処?果のレポ?ト表現を返します?
406 * 処??ログラ?、?力件数、?力件数などの??です?
407 * こ???をそのまま、標準?力に出すことで、結果レポ?トと出来るよ?
408 * 形式で出してください?
409 *
410 * @return 処?果のレポ??
411 */
412 public String report() {
413 String report = "[" + getClass().getName() + "]" + CR
414 + TAB + "Action : " + actionCmd + CR
415 + TAB + "DBID : " + dbid + CR
416 + TAB + "sqlCount : " + sqlCount + CR
417 + TAB + "setCount : " + setCount + CR
418 + TAB + "outCount : " + outCount ;
419
420 return report ;
421 }
422
423 /**
424 * こ?クラスの使用方法を返します?
425 *
426 * @return こ?クラスの使用方?
427 */
428 public String usage() {
429 StringBuilder buf = new StringBuilder();
430
431 buf.append( "Process_BulkQueryは、データベ?スから読み取った?容を??処?るために? ).append( CR );
432 buf.append( "ParamProcess のサブクラス(Process_DBParam)にセ?したり??したりす? ).append( CR );
433 buf.append( "FirstProcess と、ChainProcess のインターフェースを両方持った?実?ラスです?" ).append( CR );
434 buf.append( CR );
435 buf.append( "こ?クラスは、上流から?下流への処???度しか実行されません? ).append( CR );
436 buf.append( "FirstProcess の検索結果は、Set オブジェクトとして、Process_DBParam に渡します?" ).append( CR );
437 buf.append( "ChainProcess は、その結果を取り?し?自??身の処?果と合せて?します?" ).append( CR );
438 buf.append( CR );
439 buf.append( "FirstProcess では?action は、query のみです?" ).append( CR );
440 buf.append( " query は、指定?SQL?実行し、結果のSetをParamProcessに設定します?" ).append( CR );
441 buf.append( "ChainProcess では?action は、query、bulkSet、minus、intersect が指定できます?" ).append( CR );
442 buf.append( " query は、上記と同じです?" ).append( CR );
443 buf.append( " minus は、?のSetから、SQL??実行結果を引き算し、結果Setを?設定します?" ).append( CR );
444 buf.append( " intersect は、?のSetから、SQL??実行結果と重?る結果Setを?設定します?" ).append( CR );
445 buf.append( " bulkSet は、?のSetを取り?し?SQL??して処?ます?" ).append( CR );
446 buf.append( CR );
447 buf.append( "流れ?は、query で検索し?minusまた?intersect でSetオブジェクトを?し?" ).append( CR );
448 buf.append( "bulkSet で利用します?例えば、ORACLEから、ユニ?クキーのSetを作?し?" ).append( CR );
449 buf.append( "SQLServerのユニ?クキーをminusした結果を?ORACLEからDELETEすれば、不要な" ).append( CR );
450 buf.append( "??タを削除するなどの処?実行可能になります?また?単純に、query ?を?" ).append( CR );
451 buf.append( "チェインすれば、単発のUPDATE?実行することが可能です?" ).append( CR );
452 buf.append( CR );
453 buf.append( "??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR );
454 buf.append( "設定された接?Connection)を使用します?" ).append( CR );
455 buf.append( CR );
456 buf.append( "引数??中に空白を含??合?、ダブルコー??ション(\"\") で括って下さ??" ).append( CR );
457 buf.append( "引数??の ?』?前後には、空白は挟めません。??key=value の様に" ).append( CR );
458 buf.append( "繋げてください? ).append( CR );
459 buf.append( CR );
460 buf.append( "SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?" ).append( CR );
461 buf.append( CR ).append( CR );
462
463 buf.append( getArgument().usage() ).append( CR );
464
465 return buf.toString();
466 }
467
468 /**
469 * こ?クラスは、main メソ?から実行できません?
470 *
471 * @param args コマンド引数配?
472 */
473 public static void main( final String[] args ) {
474 LogWriter.log( new Process_BulkQuery().usage() );
475 }
476 }