001 /*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016 package org.opengion.fukurou.process;
017
018 import org.opengion.fukurou.util.Argument;
019 import org.opengion.fukurou.util.SystemParameter;
020 import org.opengion.fukurou.util.LogWriter;
021 import org.opengion.fukurou.util.HybsEntry ;
022 import org.opengion.fukurou.util.Closer;
023 import org.opengion.fukurou.model.Formatter;
024 import org.opengion.fukurou.db.ConnectionFactory;
025
026 import java.util.Map ;
027 import java.util.LinkedHashMap ;
028
029 import java.sql.Connection;
030 import java.sql.PreparedStatement;
031 import java.sql.ParameterMetaData;
032 import java.sql.SQLException;
033
034 /**
035 * Process_DBMerge は、UPDATE と INSERT を指定し ??タベ?スを追?新
036 * する、ChainProcess インターフェースの実?ラスです?
037 * 上?プロセスチェインの??タは上流から下流へと渡されます?)から
038 * 受け取っ?LineModel を?に、DBTableModel 形式ファイルを?力します?
039 *
040 * ??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に
041 * 設定された接?Connection)を使用します?
042 *
043 * 引数??中にスペ?スを含??合?、ダブルコー??ション("") で括って下さ??
044 * 引数??の ?』?前後には、スペ?スは挟めません。??key=value の様に
045 * 繋げてください?
046 *
047 * SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?
048 *
049 * @og.formSample
050 * Process_DBMerge -dbid=DBGE -insertTable=GE41
051 *
052 * [ -dbid=DB接続ID ] ??-dbid=DBGE (? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規?
053 * [ -update=検索SQL? ] ??-update="UPDATE GE41 SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME]
054 * WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]"
055 * [ -updateFile=登録SQL?ァ??? ] ??-updateFile=update.sql
056 * ?? -update ?-updateFile が指定されな??合?、エラーです?
057 * [ -update_XXXX=固定? ] ??-update_SYSTEM_ID=GE
058 * SQL?の{@XXXX}??を指定?固定?で置き換えます?
059 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'
060 * [ -insertTable=登録???゙ルID ] ??INSERT??する?合?不要?INSERT する場合???ブルID
061 * [ -insert=検索SQL? ] ??-insert="INSERT INTO GE41 (SYSTEM_ID,CLM,NAME_JA,LABEL_NAME)
062 * VALUES ([SYSTEM_ID],[CLM],[NAME_JA],[LABEL_NAME])"
063 * [ -insertFile=登録SQL?ァ??? ] ??-insertFile=insert.sql
064 * ?? -insert ?-insertFile ??-table が指定されな??合?、エラーです?
065 * [ -insert_XXXX=固定? ] ??-insert_SYSTEM_ID=GE
066 * SQL?の{@XXXX}??を指定?固定?で置き換えます?
067 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'
068 * [ -const_XXXX=固定? ] ??-const_FGJ=1
069 * LineModel のキー(const_ に続く??)の値に、固定?を設定します?
070 * キーが異なれ?、?のカラ?を指定できます?
071 * [ -commitCnt=commit処?定] ???数毎にコミットを発行します?0 の場合?、終?でコミットしません?
072 * [ -display=false|true ] ??結果を標準?力に表示する(true)かしな?false)?初期値:false[表示しない])
073 *
074 * @version 4.0
075 * @author Kazuhiko Hasegawa
076 * @since JDK5.0,
077 */
078 public class Process_DBMerge extends AbstractProcess implements ChainProcess {
079 private static final String UPDATE_KEY = "update_" ;
080 private static final String INSERT_KEY = "insert_" ;
081 private static final String CNST_KEY = "const_" ;
082
083 private Connection connection = null;
084 private PreparedStatement insPstmt = null ;
085 private PreparedStatement updPstmt = null ;
086 private ParameterMetaData insPmeta = null ; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す?(PostgreSQL対?
087 private ParameterMetaData updPmeta = null ; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す?(PostgreSQL対?
088 private boolean useParamMetaData = false; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す?(PostgreSQL対?
089
090 private String dbid = null;
091 private String insert = null;
092 private String update = null;
093 private String insertTable = null;
094 private int[] insClmNos = null; // insert 時?ファイルのヘッ??のカラ?号
095 private int[] updClmNos = null; // update 時?ファイルのヘッ??のカラ?号
096 private int commitCnt = 0; // コミットするまとめ件数
097 private boolean display = false; // 表示しな?
098
099 private String[] cnstClm = null; // 固定?を設定するカラ?
100 private int[] cnstClmNos = null; // 固定?を設定するカラ?号
101 private String[] constVal = null; // カラ?号に対応した固定?
102
103 private boolean firstRow = true; // ??の?目
104 private int count = 0;
105 private int insCount = 0;
106 private int updCount = 0;
107
108 private static final Map<String,String> mustProparty ; // ?プロパティ???チェ?用 Map
109 private static final Map<String,String> usableProparty ; // ?プロパティ?整合?チェ? Map
110
111 static {
112 mustProparty = new LinkedHashMap<String,String>();
113
114 usableProparty = new LinkedHashMap<String,String>();
115 usableProparty.put( "dbid", "Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? );
116 usableProparty.put( "update", "更新SQL?sql or sqlFile ??)" +
117 CR + "? \"UPDATE GE41 " +
118 CR + "SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME] " +
119 CR + "WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]\"" );
120 usableProparty.put( "updateFile", "更新SQLファイル(sql or sqlFile ??)? update.sql" );
121 usableProparty.put( "update_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" +
122 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" );
123 usableProparty.put( "insert", "登録SQL?sql or sqlFile ??)" +
124 CR + "? \"INSERT INTO GE41 " +
125 CR + "(SYSTEM_ID,CLM,NAME_JA,LABEL_NAME) " +
126 CR + "VALUES ([SYSTEM_ID],[CLM],[NAME_JA],[LABEL_NAME])\"" );
127 usableProparty.put( "insertFile", "登録SQLファイル(sql or sqlFile ??)? insert.sql" );
128 usableProparty.put( "insertTable", "INSERT する場合???ブルID SQL??する?合?不要?" );
129 usableProparty.put( "insert_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" +
130 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" );
131 usableProparty.put( "const_", "LineModel のキー(const_ に続く??)の値に、固定?? +
132 CR + "設定します?キーが異なれ?、?のカラ?を指定できます?" +
133 CR + "? -sql_SYSTEM_ID=GE" );
134 usableProparty.put( "commitCnt", "?数毎にコミットを発行します?" +
135 CR + "0 の場合?、終?でコミットしません(初期値: 0)" );
136 usableProparty.put( "display", "結果を標準?力に表示する(true)かしな?false)? +
137 CR + "(初期値:false:表示しな?" );
138 }
139
140 /**
141 * ?ォルトコンストラクター?
142 * こ?クラスは、動??されます??ォルトコンストラクターで?
143 * super クラスに対して、?な初期化を行っておきます?
144 *
145 */
146 public Process_DBMerge() {
147 super( "org.opengion.fukurou.process.Process_DBMerge",mustProparty,usableProparty );
148 }
149
150 /**
151 * プロセスの初期化を行います?初めに??、呼び出されます?
152 * 初期処?ファイルオープン??オープン?に使用します?
153 *
154 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
155 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData ?ConnectionFactory経由で取得?(PostgreSQL対?
156 *
157 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク?
158 */
159 public void init( final ParamProcess paramProcess ) {
160 Argument arg = getArgument();
161
162 insertTable = arg.getProparty("insertTable");
163 update = arg.getFileProparty("update","updateFile",false);
164 insert = arg.getFileProparty("insert","insertFile",false);
165 commitCnt = arg.getProparty("commitCnt",commitCnt);
166 display = arg.getProparty("display",display);
167
168 dbid = arg.getProparty("dbid");
169 connection = paramProcess.getConnection( dbid );
170 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
171 // useParamMetaData = ApplicationInfo.useParameterMetaData( connection );
172 useParamMetaData = ConnectionFactory.useParameterMetaData( dbid ); // 5.3.8.0 (2011/08/01)
173
174 if( insert == null && insertTable == null ) {
175 String errMsg = "insert また?、insertFile を指定しな??合?、insertTable を??してください?;
176 throw new RuntimeException( errMsg );
177 }
178
179 if( insert != null && insertTable != null ) {
180 String errMsg = "insert また?、insertFile と、insertTable は、両方同時に?できません?"
181 + insert + "],[" + insertTable + "]";
182 throw new RuntimeException( errMsg );
183 }
184
185 // 3.8.0.1 (2005/06/17) {@DATE.XXXX} 変換処??追?
186 // {@DATE.YMDH} などの??を?yyyyMMddHHmmss 型?日付に置き換えます?
187 // SQL?? {@XXXX} ??の固定?への置き換?
188 HybsEntry[] entry =arg.getEntrys(UPDATE_KEY); // 配?
189 SystemParameter sysParam = new SystemParameter( update );
190 update = sysParam.replace( entry );
191
192 if( insert != null ) {
193 entry =arg.getEntrys(INSERT_KEY); // 配?
194 sysParam = new SystemParameter( insert );
195 insert = sysParam.replace( entry );
196 }
197
198 HybsEntry[] cnstKey = arg.getEntrys( CNST_KEY ); // 配?
199 int csize = cnstKey.length;
200 cnstClm = new String[csize];
201 constVal = new String[csize];
202 for( int i=0; i<csize; i++ ) {
203 cnstClm[i] = cnstKey[i].getKey();
204 constVal[i] = cnstKey[i].getValue();
205 }
206 }
207
208 /**
209 * プロセスの終?行います??に??、呼び出されます?
210 * 終???ファイルクローズ??クローズ?に使用します?
211 *
212 * @og.rev 4.0.0.0 (2007/11/27) commit,rollback,remove 処?追?
213 * @og.rev 5.1.2.0 (2010/01/01) insPmeta , updPmeta のクリア
214 *
215 * @param isOK ト?タルで、OK?たかど?[true:成功/false:失敗]
216 */
217 public void end( final boolean isOK ) {
218 boolean flag1 = Closer.stmtClose( updPstmt );
219 updPstmt = null;
220 boolean flag2 = Closer.stmtClose( insPstmt );
221 insPstmt = null;
222
223 insPmeta = null ; // 5.1.2.0 (2010/01/01)
224 updPmeta = null ; // 5.1.2.0 (2010/01/01)
225
226 // close に失敗して?のに commit しても良??か?
227 if( isOK ) {
228 Closer.commit( connection );
229 }
230 else {
231 Closer.rollback( connection );
232 }
233 ConnectionFactory.remove( connection,dbid );
234
235 if( ! flag1 ) {
236 String errMsg = "update ス??トメントをクローズ出来ません? + CR
237 + " update=[" + update + "] , commit=[" + isOK + "]" ;
238 throw new RuntimeException( errMsg );
239 }
240
241 if( ! flag2 ) {
242 String errMsg = "insert ス??トメントをクローズ出来ません? + CR
243 + " insert=[" + insert + "] , commit=[" + isOK + "]" ;
244 throw new RuntimeException( errMsg );
245 }
246 }
247
248 /**
249 * 引数の LineModel を??るメソ?です?
250 * 変換処?? LineModel を返します?
251 * 後続??行わな?????タのフィルタリングを行う場?は?
252 * null ??タを返します?つまり?null ??タは、後続??行わな?
253 * フラグの代わりにも使用して?す?
254 * なお?変換処?? LineModel と、オリジナルの LineModel が?
255 * 同?、コピ?(クローン)か?、各処?ソ??決めて?す?
256 * ドキュメントに明記されて???合?、副作用が問題になる?合??
257 * ???とに自?コピ?(クローン)して下さ??
258 *
259 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
260 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData setNull 対?PostgreSQL対?
261 *
262 * @param data ラインモ? オリジナルのLineModel
263 *
264 * @return 処?換後?LineModel
265 */
266 public LineModel action( final LineModel data ) {
267 count++ ;
268 int updCnt = 0;
269 try {
270 if( firstRow ) {
271 makePrepareStatement( insertTable,data );
272
273 int size = cnstClm.length;
274 cnstClmNos = new int[size];
275 for( int i=0; i<size; i++ ) {
276 cnstClmNos[i] = data.getColumnNo( cnstClm[i] );
277 }
278
279 firstRow = false;
280 }
281
282 // 固定?置き換え??
283 for( int j=0; j<cnstClmNos.length; j++ ) {
284 data.setValue( cnstClmNos[j],constVal[j] );
285 }
286
287 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
288 if( useParamMetaData ) {
289 for( int i=0; i<updClmNos.length; i++ ) {
290 int type = updPmeta.getParameterType( i+1 );
291 // 5.3.8.0 (2011/08/01) setNull 対?
292 // updPstmt.setObject( i+1,data.getValue(updClmNos[i]),type );
293 Object val = data.getValue(updClmNos[i]);
294 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) {
295 updPstmt.setNull( i+1, type );
296 }
297 else {
298 updPstmt.setObject( i+1, val, type );
299 }
300 }
301 }
302 else {
303 for( int i=0; i<updClmNos.length; i++ ) {
304 updPstmt.setObject( i+1,data.getValue(updClmNos[i]) );
305 }
306 }
307
308 updCnt = updPstmt.executeUpdate();
309 if( updCnt == 0 ) {
310 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
311 if( useParamMetaData ) {
312 for( int i=0; i<insClmNos.length; i++ ) {
313 int type = insPmeta.getParameterType( i+1 );
314 // 5.3.8.0 (2011/08/01) setNull 対?
315 // insPstmt.setObject( i+1,data.getValue(insClmNos[i]),type );
316 Object val = data.getValue(insClmNos[i]);
317 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) {
318 insPstmt.setNull( i+1, type );
319 }
320 else {
321 insPstmt.setObject( i+1, val, type );
322 }
323 }
324 }
325 else {
326 for( int i=0; i<insClmNos.length; i++ ) {
327 insPstmt.setObject( i+1,data.getValue(insClmNos[i]) );
328 }
329 }
330 int insCnt = insPstmt.executeUpdate();
331 if( insCnt == 0 ) {
332 String errMsg = "?件も追?れませんでした? + CR
333 + " insert=[" + insert + "]" + CR
334 + "[" + data.getRowNo() + "]件目" + CR ;
335 throw new RuntimeException( errMsg );
336 }
337 insCount++ ;
338 }
339 else if( updCnt > 1 ) {
340 String errMsg = "?行が同時に更新されました?" + updCnt + "]件" + CR
341 + " update=[" + update + "]" + CR
342 + "[" + data.getRowNo() + "]件目" + CR ;
343 throw new RuntimeException( errMsg );
344 }
345 else {
346 updCount ++ ;
347 }
348
349 if( commitCnt > 0 && ( count%commitCnt == 0 ) ) {
350 Closer.commit( connection );
351 }
352 if( display ) { printKey( count,updCnt,data ); }
353 }
354 catch (SQLException ex) {
355 String errMsg = "登録処?エラーが発生しました? + CR
356 + ((updCnt == 1) ?
357 " update=[" + update + "]"
358 : " insert=[" + insert + "]" + CR
359 + " insertTable=[" + insertTable + "]" )
360 + CR
361 + "[" + data.getRowNo() + "]件目" + CR
362 + "errorCode=[" + ex.getErrorCode() + "] State=["
363 + ex.getSQLState() + "]" + CR ;
364 throw new RuntimeException( errMsg,ex );
365 }
366 return data;
367 }
368
369 /**
370 * ?で使用する PreparedStatement を作?します?
371 * 引数?? SQL また?、LineModel から作?した SQL より構築します?
372 *
373 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
374 *
375 * @param table 処?象の??ブルID
376 * @param data ラインモ? 処?象のLineModel
377 */
378 private void makePrepareStatement( final String table,final LineModel data ) {
379 if( insert == null ) {
380 StringBuilder buf = new StringBuilder();
381 String[] names = data.getNames();
382 int size = names.length;
383
384 buf.append( "INSERT INTO " ).append( table ).append( " (" );
385 buf.append( names[0] );
386 for( int i=1; i<size; i++ ) {
387 buf.append( "," ).append( names[i] );
388 }
389 buf.append( " ) VALUES ( ?" );
390 for( int i=1; i<size; i++ ) {
391 buf.append( ",?" );
392 }
393 buf.append( " )" );
394 insert = buf.toString();
395
396 // カラ?号を設定します?
397 insClmNos = new int[size];
398 for( int i=0; i<size; i++ ) {
399 insClmNos[i] = i;
400 }
401 }
402 else {
403 Formatter format = new Formatter( data );
404 format.setFormat( insert );
405 insert = format.getQueryFormatString();
406 insClmNos = format.getClmNos();
407 }
408
409 Formatter format = new Formatter( data );
410 format.setFormat( update );
411 update = format.getQueryFormatString();
412 updClmNos = format.getClmNos();
413
414 try {
415 insPstmt = connection.prepareStatement( insert );
416 updPstmt = connection.prepareStatement( update );
417 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
418 if( useParamMetaData ) {
419 insPmeta = insPstmt.getParameterMetaData();
420 updPmeta = updPstmt.getParameterMetaData();
421 }
422 }
423 catch (SQLException ex) {
424 String errMsg = "PreparedStatement を取得できませんでした? + CR
425 + "insert=[" + insert + "]" + CR
426 + "update=[" + update + "]" + CR
427 + "table=[" + table + "]" + CR
428 + "nameLine=[" + data.nameLine() + "]" ;
429 throw new RuntimeException( errMsg,ex );
430 }
431 }
432
433 /**
434 * 画面出力用のフォーマットを作?します?
435 *
436 * @param rowNo ??タ読み取り件数
437 * @param updCnt 更新件数
438 * @param data ラインモ?
439 */
440 private void printKey( final int rowNo , final int updCnt , final LineModel data ) {
441 StringBuilder buf = new StringBuilder();
442
443 if( updCnt > 0 ) { buf.append( "UPDATE " ); }
444 else { buf.append( "INSERT " ); }
445
446 buf.append( "row=[" ).append( rowNo ).append( "] : " );
447 for( int i=0; i < updClmNos.length; i++ ) {
448 if( i == 0 ) { buf.append( "key: " ); }
449 else { buf.append( " and " ); }
450 buf.append( data.getName( updClmNos[i] ) );
451 buf.append( " = " );
452 buf.append( data.getValue( updClmNos[i] ) );
453 }
454
455 println( buf.toString() );
456 }
457
458 /**
459 * プロセスの処?果のレポ?ト表現を返します?
460 * 処??ログラ?、?力件数、?力件数などの??です?
461 * こ???をそのまま、標準?力に出すことで、結果レポ?トと出来るよ?
462 * 形式で出してください?
463 *
464 * @return 処?果のレポ??
465 */
466 public String report() {
467 String report = "[" + getClass().getName() + "]" + CR
468 + TAB + "DBID : " + dbid + CR
469 + TAB + "Input Count : " + count + CR
470 + TAB + "Update Count : " + updCount + CR
471 + TAB + "Insert Count : " + insCount ;
472
473 return report ;
474 }
475
476 /**
477 * こ?クラスの使用方法を返します?
478 *
479 * @return こ?クラスの使用方?
480 */
481 public String usage() {
482 StringBuilder buf = new StringBuilder();
483
484 buf.append( "Process_DBMerge は、UPDATE と INSERT を指定し ??タベ?スを追?新" ).append( CR );
485 buf.append( "する、ChainProcess インターフェースの実?ラスです?" ).append( CR );
486 buf.append( "上?プロセスチェインの??タは上流から下流へと渡されます?)から" ).append( CR );
487 buf.append( "受け取っ?LineModel を?に、データベ?スの存在チェ?を行い? ).append( CR );
488 buf.append( "下流への処?振り?けます?" ).append( CR );
489 buf.append( CR );
490 buf.append( "??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR );
491 buf.append( "設定された接?Connection)を使用します?" ).append( CR );
492 buf.append( CR );
493 buf.append( "引数??中に空白を含??合?、ダブルコー??ション(\"\") で括って下さ??" ).append( CR );
494 buf.append( "引数??の ?』?前後には、空白は挟めません。??key=value の様に" ).append( CR );
495 buf.append( "繋げてください? ).append( CR );
496 buf.append( CR );
497 buf.append( "SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?" ).append( CR );
498 buf.append( CR ).append( CR );
499 buf.append( getArgument().usage() ).append( CR );
500
501 return buf.toString();
502 }
503
504 /**
505 * こ?クラスは、main メソ?から実行できません?
506 *
507 * @param args コマンド引数配?
508 */
509 public static void main( final String[] args ) {
510 LogWriter.log( new Process_DBMerge().usage() );
511 }
512 }