كيفية القيام بالتنبؤ بالسلسلة الزمنية باستخدام RNNs و TensorFlow و Cloud ML Engine

واجهة برمجة تطبيقات Estimators في tf.contrib.learn هي طريقة ملائمة للغاية للبدء في استخدام TensorFlow. الشيء الرائع حقًا من وجهة نظري حول API Estimators هو أن استخدامه طريقة سهلة جدًا لإنشاء نماذج TensorFlow الموزعة. لا يتم توزيع العديد من عينات TensorFlow التي تراها عائمة على شبكة الإنترنت - إنها تفترض أنك ستقوم بتشغيل الشفرة على جهاز واحد. يبدأ الأشخاص بمثل هذه الشفرة وبعد ذلك يشعرون بالحزن الشديد عندما علموا أن شفرة TensorFlow منخفضة المستوى لا تعمل في الواقع على مجموعة بيانات كاملة. يجب عليهم بعد ذلك القيام بالكثير من العمل لإضافة شفرة تدريب موزعة حول العينة الأصلية ، ومن الذي يريد تعديل كود شخص آخر؟

ملاحظة: انتقلت مقدرات الآن إلى Tensorflow الأساسية. الكود المحدث الذي يستخدم tf.estimator بدلاً من tf.contrib.learn.estimator هو الآن في GitHub - استخدم الكود المحدث كنقطة انطلاق.

لذا ، من فضلك ، من فضلك ، من فضلك ، إذا شاهدت عينة TensorFlow لا تستخدم API Estimators ، فتجاهلها. سيكون هناك الكثير من العمل لجعله يعمل على إنتاج مجموعات بياناتك (اقرأ: كبيرة) - سيكون هناك مراقبين ومنسقين وخوادم معلمات وجميع أنواع أنظمة برمجة الجنون التي لا تريد الغوص فيها. ابدأ بـ API مقدّر واستخدم فئة التجربة. (إخلاء المسئولية: آرائي ، وليس آراء صاحب العمل).

يحتاج التنبؤ بالتسلسل الزمني إلى مقدّر مخصص

واجهة برمجة التطبيقات المقدرة تأتي مع مصنف الشبكة العصبية العميق وراجعها. إذا كانت لديك بيانات منظمة نموذجية ، فاتبع البرنامج التعليمي المرتبط أعلاه أو خذ هذه الدورة التدريبية من Google Cloud (ستتوفر قريبًا في Coursera) وستكون في طريقك لإنشاء نماذج للتعلم الآلي تعمل على مجموعات بيانات واقعية كبيرة في مستودع البيانات العلائقية الخاص بك. ولكن ماذا لو لم يكن لديك مشكلة بيانات منظمة نموذجية؟ في هذه الحالة ، ستحتاج غالبًا إلى إنشاء مقدّر مخصص. في هذا بلوق وظيفة ، وسوف تظهر لك كيف.

هناك نوع شائع من البيانات التي تريد القيام بالتعلم الآلي عليها هو بيانات السلاسل الزمنية. بشكل أساسي ، مدخلاتك هي مجموعة من الأرقام وتريد التنبؤ بالعدد التالي في هذا التسلسل. في هذه المقالة ، سأجعلها أكثر عمومية وأفترض أنك تريد توقع آخر رقمين من التسلسل. كما يقول المثل علوم الحاسب ، إذا كنت تستطيع عمل اثنين ، يمكنك أن تفعل N.

تسمى بنية الشبكة العصبية التقليدية المستخدمة في التنبؤ التسلسل إلى التسلسل بالشبكة العصبية المتكررة (RNN). راجع هذه المقالة وهذا للحصول على مقدمة يمكن الوصول إليها جداً إلى RNNs. لكنك لست بحاجة إلى معرفة كيفية تنفيذ RNN لاستخدام واحدة ، لذلك بمجرد أن تصبح هذه المقالات أعمق مما تريد ، استقال.

لمتابعة هذا المقال ، افتح دفتر Jupyter الخاص بي في نافذة متصفح أخرى. أنا فقط أعرض المقتطفات الرئيسية من الشفرة هنا. يحتوي دفتر الملاحظات (ومجلد GitHub) على جميع التعليمات البرمجية.

محاكاة بعض بيانات السلاسل الزمنية

عادة ما يكون من الأسهل تعلمها باستخدام مجموعة بيانات لعبة صغيرة يمكنك توليدها بقدر ما تريد. سوف تأتي البيانات الحقيقية مع المراوغات الخاصة بها! لذا ، فلننشئ مجموعة من بيانات السلاسل الزمنية. يتكون كل تسلسل من 10 أرقام. سنستخدم الثمانية الأوائل كمدخلات والآخران كمسميات (على سبيل المثال ، ما يجب التنبؤ به):

الكود لإنشاء تسلسلات السلاسل الزمنية هذه باستخدام numpy (np):

SEQ_LEN = 10
def create_time_series ():
  التكرار = (np.random.random () * 0.5) + 0.1 # 0.1 إلى 0.6
  ampl = np.random.random () + 0.5 # 0.5 إلى 1.5
  x = np.sin (np.arange (0 ، SEQ_LEN) * freq) * ampl
  عودة س

اكتب مجموعة من هذه السلاسل الزمنية التسلسلية إلى ملفات CSV (train.csv و valid.csv) ونحن في عمل. لدينا بيانات.

وظيفة الإدخال

الطريقة التي تعمل بها "مقدرات واجهة برمجة التطبيقات" في TensorFlow هي أنك تحتاج إلى توفير إدخال_قراء لقراءة بياناتك. أنت لا تقدم قيم x و y. بدلاً من ذلك ، يمكنك توفير دالة تقوم بإرجاع المدخلات والعلامات. المدخلات هي قاموس لجميع المدخلات الخاصة بك (اسم الإدخال إلى الموتر) والعلامات هي موتر.

في حالتنا ، يتكون ملف CSV لدينا ببساطة من 10 أرقام فاصلة عائمة. تعمل DEFAULTS على تحديد نوع البيانات للتنسورات. نريد أن نقرأ البيانات 20 خطوط في وقت واحد ؛ هذا هو BATCH_SIZE. الدفعة هي عدد العينات التي يتم إجراء نزول التدرج عليها. ستحتاج إلى تجربة هذا الرقم - إذا كان حجمه كبيرًا للغاية ، سيكون تدريبك بطيئًا وإذا كان صغيراً للغاية ، فلن يتقارب التدريب الخاص بك ولن يتقارب. نظرًا لأن لدينا مدخلات فقط ، فإن الاسم الذي تسميه هذا الإدخال ليس مهمًا حقًا. سوف نسميها البيانات الأولية.

عيوب = [[0.0] لـ x في xrange (0 ، SEQ_LEN)]
BATCH_SIZE = 20
TIMESERIES_COL = 'rawdata'
N_OUTPUTS = 2 # في كل تسلسل ، 1-8 عبارة عن ميزات ، و 9-10 عبارة عن تسمية
N_INPUTS = SEQ_LEN - N_OUTPUTS

لا ينبغي أن تأخذ المدخلات التي تريدها واجهة برمجة التطبيقات المقدرة أي معلمات. ومع ذلك ، فنحن نريد أن نكون قادرين على توفير اسم (أسماء) الملفات للقراءة في سطر الأوامر. لذلك ، دعنا نكتب دالة read_dataset () تُرجع إدخال_ fn.

# قراءة البيانات وتحويلها إلى التنسيق المطلوب
def read_dataset (اسم الملف ، الوضع = tf.contrib.learn.ModeKeys.TRAIN):
  def _input_fn ():
    num_epochs = 100 if mode == tf.contrib.learn.ModeKeys.TRAIN else 1

أول شيء نفعله هو تحديد عدد الحقبة. هذا هو عدد المرات التي نحتاج فيها لتصفح مجموعة البيانات. سنقوم بمراجعة مجموعة البيانات 100 مرة إذا كنا نتدرب ، ولكن مرة واحدة فقط إذا كنا نقوم بالتقييم.

بعد ذلك ، سنفعل توسع البطاقة. في كثير من الأحيان ، تُنتج برامج البيانات الكبيرة ملفات مظللة مثل train.csv-0001-of-0036 وهكذا ، نود ببساطة توفير train.csv * كمدخلات. نستخدم هذا لملء قائمة انتظار اسم ملف ثم نستخدم TextLineReader لقراءة البيانات:

# يمكن أن يكون مسار لملف واحد أو نمط ملف.
input_file_names = tf.train.match_filenames_once (اسم الملف)
filename_queue = tf.train.string_input_producer (
        input_file_names ، num_epochs = num_epochs ، خلط ورق اللعب = صحيح)
القارئ = tf.TextLineReader ()
    _ ، value = reader.read_up_to (filename_queue ، num_records = BATCH_SIZE)
value_column = tf.expand_dims (القيمة ، -1)

بعد ذلك ، نقوم بفك تشفير البيانات ، ونتعامل مع الأرقام الثمانية الأولى كمدخلات والآخران كتسمية. المدخلات ، عندما نقرأها ، هي قائمة مكونة من 8 تنسورات كل منها بحجم x. باستخدام tf.concat يجعلها تانسور بحجم 8xbatchsize واحد. هذا أمر مهم لأن API Estimators يريد التنسورات وليس قوائم.

# all_data هي قائمة من التنسورات
all_data = tf.decode_csv (value_column ، record_defaults = DEFAULTS)
المدخلات = all_data [: len (all_data) -N_OUTPUTS] # القيم القليلة الأولى
label = all_data [len (all_data) -N_OUTPUTS:] # # القيم القليلة الأخيرة
   
# من قائمة التنسورات إلى التنسورات ذات بعد واحد
المدخلات = tf.concat (المدخلات ، المحور = 1)
التسمية = tf.concat (التسمية ، المحور = 1)
طباعة 'المدخلات = {}'. التنسيق (المدخلات)
   
عرض {TIMESERIES_COL: المدخلات} ، والتسمية # dict of features ، label

تحديد RNN

إذا كنا نستخدم LinearRegressor ، DNNRegressor ، DNNLinearCombinedRegressor ، وما إلى ذلك ، فبإمكاننا ببساطة استخدام الفئة الحالية. ولكن نظرًا لأننا نقوم بالتنبؤ بالتسلسل إلى التسلسل ، يتعين علينا كتابة دالة النموذج الخاصة بنا. على الأقل في الوقت الحالي ، لا يأتي API المقدرون مع RNNRegressor خارج الصندوق. لذلك ، لنطرح نموذج RNN الخاص بنا باستخدام وظائف TensorFlow منخفضة المستوى.

LSTM_SIZE = 3 # عدد الطبقات المخفية في كل من خلايا LSTM

# إنشاء نموذج الاستدلال
def simple_rnn (الميزات والأهداف والوضع):
  # 0. إعادة تنسيق شكل الإدخال لتصبح تسلسل
  س = tf.split (ميزات [TIMESERIES_COL] ، N_INPUTS ، 1)
  #print 'x = {}'. التنسيق (x)
    
  # 1. تكوين RNN
  lstm_cell = rnn.BasicLSTMCell (LSTM_SIZE ، forget_bias = 1.0)
  المخرجات ، _ = rnn.static_rnn (lstm_cell ، x ، dtype = tf.float32)

  # شريحة للاحتفاظ فقط بالخلية الأخيرة من RNN
  المخرجات = المخرجات [-1]
  #print 'last outputs = {}'. format (output)
  
  # الإخراج هو نتيجة التنشيط الخطي للطبقة الأخيرة من RNN
  الوزن = tf.Variable (tf.random_normal ([LSTM_SIZE، N_OUTPUTS]))
  التحيز = tf.Variable (tf.random_normal ([N_OUTPUTS]))
  التنبؤات = tf.matmul (المخرجات ، الوزن) + التحيز
    
  # 2. تحديد وظيفة الخسارة للتدريب / التقييم
  #print 'target = {}'. التنسيق (الأهداف)
  #print 'preds = {}'. التنسيق (التنبؤات)
  الخسارة = tf.losses.mean_squared_error (الأهداف والتنبؤات)
  eval_metric_ops = {
      "rmse": tf.metrics.root_mean_squared_error (الأهداف والتنبؤات)
  }
  
  # 3.حدد عملية التدريب / محسن
  train_op = tf.contrib.layers.optimize_loss (
      فقدان = خسارة،
      global_step = tf.contrib.framework.get_global_step ()
      learning_rate = 0.01،
      محسن = "SGD")

  # 4.إنشاء التوقعات
  Forecastions_dict = {"توقع": التنبؤات}
  
  # 5.العودة ModelFnOps
  إرجاع tflearn.ModelFnOps (
      وضع = الوضع،
      التنبؤات = predictions_dict،
      فقدان = خسارة،
      train_op = train_op،
      eval_metric_ops = eval_metric_ops)

أذكر أننا اضطررنا إلى تجميع المدخلات في موتر واحد لتمريرها كميزات من input_fn. الخطوة 0 ببساطة عكس هذه العملية واستعادة قائمة التنسورات.

تتكون الشبكة العصبية المتكررة من BasicLSTMLCell التي تنقل إليها المدخلات. يمكنك العودة المخرجات والولايات. قم بقطعها للاحتفاظ بالخلية الأخيرة فقط من RNN - نحن لا نستخدم أيًا من الحالات السابقة. أبنية أخرى ممكنة. على سبيل المثال ، كان بإمكاني تدريب الشبكة على الحصول على إخراج واحد فقط دائمًا واستخدام الإطارات المتداول. سأتحدث عن كيفية تعديل مثالي للقيام بذلك في نهاية هذه المقالة.

التعليقات الواردة في الكود أعلاه واضحة للغاية فيما يتعلق بالخطوات الأخرى. نحن لا نفعل أي شيء مفاجئ هناك. هذه مشكلة انحدار ، لذلك أنا أستخدم RMSE.

إنشاء تجربة

فئة التجربة هي الفئة الذكية في API Estimators. إنها تعرف كيفية استخدام الوظيفة النموذجية ، ووظائف الإدخال للتدريب والتحقق من الصحة وإجراء أشياء معقولة فيما يتعلق بالتوزيع ، والتوقف المبكر ، وما إلى ذلك. لذلك ، دعونا نوزع القطع الخاصة بنا عليها:

def get_train ():
  عرض read_dataset ('train.csv' ، mode = tf.contrib.learn.ModeKeys.TRAIN)

def get_valid ():
  عرض read_dataset ('valid.csv' ، mode = tf.contrib.learn.ModeKeys.EVAL)

def experience_fn (output_dir):
    # تشغيل التجربة
    إرجاع tflearn.Experiment (
        tflearn.Estimator (model_fn = simple_rnn، model_dir = output_dir)،
        train_input_fn = get_train ()
        eval_input_fn = get_valid ()
        eval_metrics = {
            'rmse': tflearn.MetricSpec (
                metric_fn = metrics.streaming_root_mean_squared_error
            )
        }
    )

shutil.rmtree ('outputdir'، ignore_errors = True) # تبدأ من جديد في كل مرة
learn_runner.run (experiment_fn ، 'outputdir')

التدريب على السحابة

يعمل الرمز أعلاه على جهاز واحد ، وإذا قمت بتجميعه في وحدة Python ، فيمكنك أيضًا إرساله إلى Cloud ML Engine لتدريبه بطريقة خادم:

OUTDIR = ع: // $ {BUCKET} / simplernn / model_trained
JOBNAME = simplernn _ $ (التاريخ -u +٪ y٪ m٪ d_٪ H٪ M٪ S)
المنطقة = لنا-central1
gsutil -m rm -rf $ OUTDIR
وظائف gcloud ml-engine تقدم تدريبات $ JOBNAME \
   - المنطقة = $ المنطقة \
   - اسم الوحدة = trainer.task \
   - حزمة المسار = $ {REPO} / المبسطة / المدرب \
   --job-dir = $ OUTDIR \
   - التدريج دلو = gs: // $ BUCKET \
   - مقياس المستوى = الأساسي \
   - وقت التشغيل - الإصدار = 1.0 \
   - \
   --train_data_paths = "gs: // $ {BUCKET} /train.csv*" \
   --eval_data_paths = "gs: // $ {BUCKET} /valid.csv*" \
   - Output_dir = $ OUTDIR \
   --num_epochs = 100

البديل المشترك: سلسلة زمنية طويلة جدا

في هذه المقالة ، افترضت أن لديك الآلاف من التسلسلات القصيرة (10 عناصر). ماذا لو كان لديك تسلسل طويل جدا؟ على سبيل المثال ، قد يكون لديك سعر السهم أو قراءة درجة الحرارة من جهاز استشعار. في مثل هذه الحالات ، ما يمكنك فعله هو تقسيم التسلسل الطويل الخاص بك إلى سلاسل متتابعة ذات طول ثابت. من الواضح أن هذا الطول تعسفي ، لكن فكر في الأمر على أنه الفاصل "رجعي" لشبكة RNN. إليك رمز TensorFlow الذي سيستغرق تسلسلًا طويلًا وينقسم إلى تسلسلات أصغر متداخلة ذات طول ثابت:

استيراد tensorflow كما TF
استيراد numpy كـ np
def breakup (sess، x، lookback_len):
  N = sess.run (tf.size (x))
  windows = [tf.slice (x، [b]، [lookback_len]) لـ b in xrange (0، N-lookback_len)]
  windows = tf.stack (windows)
  نوافذ العودة

فمثلا:

x = tf.constant (np.arange (1،11 ، dtype = np.float32))
مع tf.Session () كما sess:
    طباعة "الإدخال =" ، x.eval ()
    seqx = تفكك (اختبار ، س ، 5)
    طباعة 'الإخراج =' ، seqx.eval ()

سوف يؤدي إلى:

المدخلات = [1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
الناتج = [[1. 2. 3. 4. 5.]
 [2. 3. 4. 5. 6.]
 [3. 4. 5. 6. 7.]
 [4. 5. 6. 7. 8.]
 [5. 6. 7. 8. 9.]]

بمجرد الحصول على هذه التسلسلات ذات الطول الثابت ، يصبح كل شيء كما كان من قبل.

ترميز سعيد!