基於WGAN-GP方法的時間序列信號生成(以軸承振動信號為例)

2024年2月6日 17点热度 0人点赞

生成對抗網絡GAN作為非監督學習,由生成器和判別器兩個神經網絡構成。生成器從潛在空間中隨機取樣作為輸入,試圖生成與真實樣本數據相仿的數據。判別器的輸入則為真實樣本數據或生成器生成數據,進而判斷其輸入是真實數據還是生成數據。兩個網絡相互對抗、不斷調整參數,最終目的是使判別網絡無法判斷生成網絡的輸出結果是否真實。

WGAN作為GAN的改進模型,使用Wasserstein距離來替代JS散度作為優化目標,從根本上解決了GAN的梯度消失、訓練不穩定以及模式崩潰的問題。但WGAN采用權重截斷方式來滿足Lipschitz連續性條件,將導致部分權重最終落在上下限附近,使神經網絡的權重走向極端(取最大值,或取最小值),極大地限制了其擬合能力,導致梯度消失或爆炸。因此在原網絡損失函數基礎上加入懲罰項,實現Lipschitz約束,以彌補WGAN的缺陷,即WGAN-GP模型。

提出一種基於WGAN-GP方法的時間序列信號生成方法,並以軸承振動信號為例進行說明。

關於python的集成環境,采用的Winpython環境,IDE為spyder(類MATLAB界面)。

winpython脫胎於pythonxy,面向科學計算兼顧數據分析與挖掘;Anaconda主要面向數據分析與挖掘方面,在大數據處理方面有自己特色的一些包;winpython強調便攜性,被做成綠色軟件,不寫入註冊表,安裝其實就是解壓到某個文件夾,移動文件夾甚至放到U盤裡在其他電腦上也能用;Anaconda則算是傳統的軟件模式。winpython是由個人維護;Anaconda由數據分析服務公司維護,意味著Winpython在很多方面都從簡,而Anaconda會提供一些人性化設置。Winpython 隻能在windows上用,Anaconda則有linux的版本。

拋開軟件包的差異,我個人也推薦初學者用winpython,正因為其簡單,問題也少點,由於便攜性的特點系統壞了,重裝後也能直接用。

請直接安裝、使用winPython:WinPython download因為很多模塊以及集成的模塊

程序部分代碼如下:

import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import scipy.io as sio
import numpy as np
import math
import os
import sys 
import matplotlib.pyplot as plt
import time 
import matplotlib.gridspec as gridspec
import preprocess_for_gan
import argparse
args = None
#相關參數,以B007(滾動體輕微故障)為目標生成樣本
def parse_args():
    parser = argparse.ArgumentParser(description='GAN')
    # basic parameters
    parser.add_argument('--phase', type=str, default='train', help='train or generate')
    parser.add_argument('--GAN_type', type=str, default='WGAN-GP', help='the name of the model: WGAN, WGAN-GP, DCGAN, LSGAN, SNGAN, RSGAN, RaSGAN')
    parser.add_argument('--data_dir', type=str, default= 'data\\0HP', help='the directory of the data')
    parser.add_argument('--target', type=str, default='B007', help='target signal to generate')
    parser.add_argument('--imbalance_ratio', type=int, default=100, help='imbalance ratio between major class samples and minor class samples')
    parser.add_argument('--checkpoint_dir', type=str, default='samples\WGAN-GP\ORDER\ratio-50\orderIR021-05-17-20_22\checkpoint\model-97000', help='the saved checkpoint to generate signals')
    parser.add_argument('--batch_size', type=int, default=5, help='batchsize of the training process')
    parser.add_argument('--swith_threshold', type=int, default=2.5, help='threshold of G-D loss difference for determining to train G or D')
    parser.add_argument('--normalization', type=str, default='minmax', help='way to process data: minmax or mean')
    parser.add_argument('--sampling', type=str, default='order', help='way to sample signals from original dataset: enc, order, random')
    # optimization information
    parser.add_argument('--lr', type=float, default=2e-4, help='the initial learning rate')
    parser.add_argument('--epsilon', type=float, default=1e-14, help='if epsilon is too big, training of DCGAN is failure')
    # save, load and display information
    parser.add_argument('--max_epoch', type=int, default=30000, help='max number of epoch')
    parser.add_argument('--sample_step', type=int, default=1000, help='the interval of log training information')
    args = parser.parse_args()
    return args
def get_sin_training_data(shape):
    '''
    使用預加載的數據_正弦信號
    '''
    half_T = 30 # T/2 of sin function
    length = shape[0] * shape[1]
    array = np.arange(0, length)
    ori_data = np.sin(array*np.pi/half_T)
    training_data = np.reshape(ori_data, shape)
    return training_data
def shuffle_set(x, y):
    size = np.shape(x)[0]
    x_row = np.arange(0, size)
    permutation = np.random.permutation(x_row.shape[0])
    x_shuffle = x[permutation,:,:]
    y_shuffle = np.array(y)[permutation]
    return x_shuffle, y_shuffle
def get_batch(x, y, now_batch, batch_size, total_batch):
    if now_batch < total_batch - 1:
        x_batch = x[now_batch*batch_size:(now_batch 1)*batch_size,:]
        y_batch = y[now_batch*batch_size:(now_batch 1)*batch_size]
    else:
        x_batch = x[now_batch*batch_size:,:]
        y_batch = y[now_batch*batch_size:]
    return x_batch, y_batch
def plot(samples):
    num = np.size(samples,0)
    length = np.size(samples,1)
    fig = plt.figure()
    gs = gridspec.GridSpec(num,1)
    gs.update(wspace = 0.05, hspace = 0.05)
    samples = np.reshape(samples, [num,length])
    x = np.arange(0, length)
    for i in range(num):
        ax = plt.subplot(gs[i,0])
        y = samples[i]
        ax.plot(x, y)
    return fig
def deconv(inputs, shape, strides, out_num, is_sn=False):
    # input [X_batch, in_channels, in_width] // 2D [X_batch, height, width, in_channels]
    # shape [filter_width, output_channels, in_channels]
    filters = tf.get_variable("kernel", shape=shape, initializer=tf.random_normal_initializer(stddev=0.02))
    bias = tf.get_variable("bias", shape=[shape[-2]], initializer=tf.constant_initializer([0]))
    if is_sn:
        return tf.nn.conv1d_transpose(inputs, spectral_norm("sn", filters), out_num, strides)   bias
    else:
        return tf.nn.conv1d_transpose(inputs, filters, out_num, strides, "SAME")   bias
def conv(inputs, shape, strides, is_sn=False):
    filters = tf.get_variable("kernel", shape=shape, initializer=tf.random_normal_initializer(stddev=0.02))
    bias = tf.get_variable("bias", shape=[shape[-1]], initializer=tf.constant_initializer([0]))
    if is_sn:
        return tf.nn.conv1d(inputs, spectral_norm("sn", filters), strides, "SAME")   bias
    else:
        return tf.nn.conv1d(inputs, filters, strides, "SAME")   bias
def fully_connected(inputs, num_out, is_sn=False):
    W = tf.get_variable("W", [inputs.shape[-1], num_out], initializer=tf.random_normal_initializer(stddev=0.02))
    b = tf.get_variable("b", [num_out], initializer=tf.constant_initializer([0]))
    if is_sn:
        return tf.matmul(inputs, spectral_norm("sn", W))   b
    else:
        return tf.matmul(inputs, W)   b
def leaky_relu(inputs, slope=0.2):
    return tf.maximum(slope*inputs, inputs)
def spectral_norm(name, w, iteration=1):
    #Spectral normalization which was published on ICLR2018,please refer to "https://www.researchgate.net/publication/318572189_Spectral_Normalization_for_Generative_Adversarial_Networks"
    #This function spectral_norm is forked from "https://github.com/taki0112/Spectral_Normalization-Tensorflow"
    w_shape = w.shape.as_list()
    w = tf.reshape(w, [-1, w_shape[-1]])
    with tf.variable_scope(name, reuse=False):
        u = tf.get_variable("u", [1, w_shape[-1]], initializer=tf.truncated_normal_initializer(), trainable=False)
    u_hat = u
    v_hat = None
    def l2_norm(v, eps=1e-12):
        return v / (tf.reduce_sum(v ** 2) ** 0.5   eps)
    for i in range(iteration):
        v_ = tf.matmul(u_hat, tf.transpose(w))
        v_hat = l2_norm(v_)
        u_ = tf.matmul(v_hat, w)
        u_hat = l2_norm(u_)
    sigma = tf.matmul(tf.matmul(v_hat, w), tf.transpose(u_hat))
    w_norm = w / sigma
    with tf.control_dependencies([u.assign(u_hat)]):
        w_norm = tf.reshape(w_norm, w_shape)
    return w_norm
def mapping(x, epsilon):
    max = np.max(x)
    min = np.min(x)
    return (x - min) * 255.0 / (max - min   epsilon)
def instanceNorm(inputs, epsilon):
    mean, var = tf.nn.moments(inputs, axes=[1], keep_dims=True) # axes=[1,2]
    scale = tf.get_variable("scale", shape=mean.shape[-1], initializer=tf.constant_initializer([1.0]))
    shift = tf.get_variable("shift", shape=mean.shape[-1], initializer=tf.constant_initializer([0.0]))
    return (inputs - mean) * scale / (tf.sqrt(var   epsilon))   shift
def make_dir(args):
    now = time.strftime("%m-%d-%H_%M", time.localtime(time.time()))
    # samples_dir 
    # if args.phase == 'train':
    samples_dir = "samples/" args.GAN_type '/' 'ratio-' str(args.imbalance_ratio) '/' args.sampling args.target '-' now
    if not os.path.exists(samples_dir):
        os.makedirs(samples_dir)
    figure_dir = samples_dir   "/figure"
    if not os.path.exists(figure_dir):
        os.makedirs(figure_dir)
    checkpoint_dir = samples_dir   "/checkpoint"
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)
    signals_dir = samples_dir   "/signals"
    if not os.path.exists(signals_dir):
        os.makedirs(signals_dir)
    return samples_dir, figure_dir, checkpoint_dir, signals_dir

部分出圖如下:

生成的mat文件如下:

完整代碼如下:
https://mbd.pub/o/bread/mbd-Y5yamJ5y

工學博士,擔任《Mechanical System and Signal Processing》審稿專傢,擔任
《中國電機工程學報》優秀審稿專傢,《控制與決策》,《系統工程與電子技術》,《電力系統保護與控制》,《宇航學報》等EI期刊審稿專傢。

擅長領域:現代信號處理,機器學習,深度學習,數字孿生,時間序列分析,設備缺陷檢測、設備異常檢測、設備智能故障診斷與健康管理PHM等。