001package org.opengion.plugin.daemon; 002 003import java.util.Date; 004import java.util.Locale; // 7.2.9.4 (2020/11/20) 005 006import javax.jms.QueueSession; 007 008// import org.hsqldb.lib.StringUtil; 009import org.opengion.fukurou.util.StringUtil; // 7.0.6.0 (2019/10/07) 010import org.opengion.fukurou.queue.QueueInfo; 011import org.opengion.fukurou.queue.QueueSend; 012import org.opengion.fukurou.queue.QueueSendFactory; 013import org.opengion.fukurou.util.HybsTimerTask; 014import org.opengion.hayabusa.common.HybsSystem; 015import org.opengion.hayabusa.queue.DBAccessQueue; 016 017/** 018 * メッセージキュー送信 019 * メッセージキュー送信テーブルを監視して、 020 * 送信処理を行います。 021 * 022 * @og.group メッセージ連携 023 * 024 * @og.rev 5.10.15.0 (2019/08/30) 新規作成 025 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動 026 * 027 * @version 5.0 028 * @author oota 029 * @since JDK7 030 * 031 */ 032public class Daemon_QueueSend extends HybsTimerTask { 033 /** このプログラムのVERSION文字列を設定します。 {@value} */ 034 private static final String VERSION = "7.2.9.4 (2020/11/20)" ; 035 036 private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" ); 037 private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" ); 038 private static final int LOOP_COUNTER = 24; 039 040// private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 041 private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 042 private final String USER_ID = "CYYYYY"; 043 private final String PG_ID = "DMN_QueSnd"; 044 private final String DMN_NAME = "QueueReceiveDMN"; 045 private final DBAccessQueue dbAccessQueue; 046 047 private int loopCnt ; 048 private QueueSend queueSend; 049 050 /** 051 * コンストラクター 052 * 初期処理を行います。 053 */ 054 public Daemon_QueueSend(){ 055 super(); // 7.2.9.4 (2020/11/20) PMD:It is a good practice to call super() in a constructor 056 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 057 } 058 /** 059 * 開始処理 060 * タイマータスクのデーモン処理の開始ポイントです。 061 * 062 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する 063 */ 064 @Override 065 protected void startDaemon() { 066 if (loopCnt % LOOP_COUNTER == 0) { 067 loopCnt = 1; 068 System.out.println(); 069// System.out.println(toString() + " " + new Date() + ""); 070 System.out.println(toString() + " " + new Date() ); // 7.2.9.4 (2020/11/20) PMD:Do not add empty strings 071 } else { 072 // メッセージキュー送信管理テーブルから、送信対象のレコードを取得 073 final String[][] vals = dbAccessQueue.selectGE65(); 074 075 // 取得データ分の繰り返し処理を実行する 076 for(int i = 0; i < vals.length; i++) { 077 final String[] record = vals[i]; 078 079 // GE65から取得した値を変数に格納 080 final String ykno = record[0]; 081 final String queueId = record[1]; 082 final String message = record[2]; 083 final String dedupliId = record[3]; 084 final String queSyu = record[4]; 085 final String jmsUrl = record[5]; 086 087// final String queueType = queSyu.toUpperCase(); 088 final String queueType = queSyu.toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 089 queueSend = QueueSendFactory.newQueueSend(queueType); 090 091 // 接続処理 092 queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 093 094 // メッセージ送信管理テーブルから取得したデータを送信実装予定 095 final QueueInfo queueInfo = new QueueInfo(); 096 097 // 応答確認種別 098 if("MQ".equals(queueType)){ 099 // MQメッセージサーバ指定時 100 queueInfo.setMqTransacted(false); 101 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 102 // キュー名 103 queueInfo.setMqQueueName(queueId); 104 }else if("SQS".equals(queueType)){ 105 // SQSメッセージサーバ指定時 106 // グループID 107 queueInfo.setSqsFifoGroupId(queueId); 108 if(!StringUtil.isEmpty(dedupliId)) { 109 // 重複排除ID 110 // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる) 111 queueInfo.setSqsFifoDedupliId(dedupliId); 112 } 113 } 114 115 // メッセージ 116 queueInfo.setMessage(message); 117 118 // 完了フラグを処理中:2に更新 119 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS); 120 121 // メッセージ送信処理 122 try{ 123 queueSend.sendMessage(queueInfo); 124 125 // 完了フラグを完了:3に更新 126 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END); 127 128 }catch(Exception e) { 129 // 完了フラグをエラー:4に更新して、エラー情報を登録 130 dbAccessQueue.updateGE66Error(ykno, e.getMessage()); 131 } 132 } 133 134 // クローズ処理 135 queueSend.close(); 136 137 loopCnt++; 138 } 139 } 140}